aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/api_server.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3/api_server.rs
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3/api_server.rs')
-rw-r--r--src/api/s3/api_server.rs401
1 files changed, 401 insertions, 0 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
new file mode 100644
index 00000000..78a69d53
--- /dev/null
+++ b/src/api/s3/api_server.rs
@@ -0,0 +1,401 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::header;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_table::util::*;
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::s3::bucket::*;
+use crate::s3::copy::*;
+use crate::s3::cors::*;
+use crate::s3::delete::*;
+use crate::s3::get::*;
+use crate::s3::list::*;
+use crate::s3::post_object::handle_post_object;
+use crate::s3::put::*;
+use crate::s3::router::Endpoint;
+use crate::s3::website::*;
+
+pub struct S3ApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct S3ApiEndpoint {
+ bucket_name: Option<String>,
+ endpoint: Endpoint,
+}
+
+impl S3ApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ let addr = garage.config.s3_api.api_bind_addr;
+
+ ApiServer::new(
+ garage.config.s3_api.s3_region.clone(),
+ S3ApiServer { garage },
+ )
+ .run_server(addr, shutdown_signal)
+ .await
+ }
+
+ async fn handle_request_without_bucket(
+ &self,
+ _req: Request<Body>,
+ api_key: Key,
+ endpoint: Endpoint,
+ ) -> Result<Response<Body>, Error> {
+ match endpoint {
+ Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for S3ApiServer {
+ const API_NAME: &'static str = "s3";
+ const API_NAME_DISPLAY: &'static str = "S3";
+
+ type Endpoint = S3ApiEndpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> {
+ let authority = req
+ .headers()
+ .get(header::HOST)
+ .ok_or_bad_request("Host header required")?
+ .to_str()?;
+
+ let host = authority_to_host(authority)?;
+
+ let bucket_name = self
+ .garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|root_domain| host_to_bucket(&host, root_domain));
+
+ let (endpoint, bucket_name) =
+ Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?;
+
+ Ok(S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: S3ApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let S3ApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // Some endpoints are processed early, before we even check for an API key
+ if let Endpoint::PostObject = endpoint {
+ return handle_post_object(garage, req, bucket_name.unwrap()).await;
+ }
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, bucket_name).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
+ let api_key = api_key.ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "s3",
+ )?;
+
+ let bucket_name = match bucket_name {
+ None => {
+ return self
+ .handle_request_without_bucket(req, api_key, endpoint)
+ .await
+ }
+ Some(bucket) => bucket.to_string(),
+ };
+
+ // Special code path for CreateBucket API endpoint
+ if let Endpoint::CreateBucket {} = endpoint {
+ return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
+ }
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::HeadObject {
+ key, part_number, ..
+ } => handle_head(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::GetObject {
+ key, part_number, ..
+ } => handle_get(garage, &req, bucket_id, &key, part_number).await,
+ Endpoint::UploadPart {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_put_part(
+ garage,
+ req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CopyObject { key } => {
+ handle_copy(garage, &api_key, &req, bucket_id, &key).await
+ }
+ Endpoint::UploadPartCopy {
+ key,
+ part_number,
+ upload_id,
+ } => {
+ handle_upload_part_copy(
+ garage,
+ &api_key,
+ &req,
+ bucket_id,
+ &key,
+ part_number,
+ &upload_id,
+ )
+ .await
+ }
+ Endpoint::PutObject { key } => {
+ handle_put(garage, req, bucket_id, &key, content_sha256).await
+ }
+ Endpoint::AbortMultipartUpload { key, upload_id } => {
+ handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ }
+ Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
+ Endpoint::CreateMultipartUpload { key } => {
+ handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await
+ }
+ Endpoint::CompleteMultipartUpload { key, upload_id } => {
+ handle_complete_multipart_upload(
+ garage,
+ req,
+ &bucket_name,
+ bucket_id,
+ &key,
+ &upload_id,
+ content_sha256,
+ )
+ .await
+ }
+ Endpoint::CreateBucket {} => unreachable!(),
+ Endpoint::HeadBucket {} => {
+ let empty_body: Body = Body::from(vec![]);
+ let response = Response::builder().body(empty_body).unwrap();
+ Ok(response)
+ }
+ Endpoint::DeleteBucket {} => {
+ handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
+ }
+ Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
+ Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),
+ Endpoint::ListObjects {
+ delimiter,
+ encoding_type,
+ marker,
+ max_keys,
+ prefix,
+ } => {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ is_v2: false,
+ marker,
+ continuation_token: None,
+ start_after: None,
+ },
+ )
+ .await
+ }
+ Endpoint::ListObjectsV2 {
+ delimiter,
+ encoding_type,
+ max_keys,
+ prefix,
+ continuation_token,
+ start_after,
+ list_type,
+ ..
+ } => {
+ if list_type == "2" {
+ handle_list(
+ garage,
+ &ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
+ },
+ is_v2: true,
+ marker: None,
+ continuation_token,
+ start_after,
+ },
+ )
+ .await
+ } else {
+ Err(Error::BadRequest(format!(
+ "Invalid endpoint: list-type={}",
+ list_type
+ )))
+ }
+ }
+ Endpoint::ListMultipartUploads {
+ delimiter,
+ encoding_type,
+ key_marker,
+ max_uploads,
+ prefix,
+ upload_id_marker,
+ } => {
+ handle_list_multipart_upload(
+ garage,
+ &ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ key_marker,
+ upload_id_marker,
+ },
+ )
+ .await
+ }
+ Endpoint::ListParts {
+ key,
+ max_parts,
+ part_number_marker,
+ upload_id,
+ } => {
+ handle_list_parts(
+ garage,
+ &ListPartsQuery {
+ bucket_name,
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ },
+ )
+ .await
+ }
+ Endpoint::DeleteObjects {} => {
+ handle_delete_objects(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
+ Endpoint::PutBucketWebsite {} => {
+ handle_put_website(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
+ Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
+ Endpoint::PutBucketCors {} => {
+ handle_put_cors(garage, bucket_id, req, content_sha256).await
+ }
+ Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for S3ApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new(
+ "bucket",
+ self.bucket_name.clone().unwrap_or_default(),
+ ));
+ }
+}