diff options
Diffstat (limited to 'src')
78 files changed, 5520 insertions, 1141 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 5e96b081..29b26e5e 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -18,7 +18,9 @@ garage_model = { version = "0.7.0", path = "../model" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } garage_util = { version = "0.7.0", path = "../util" } +garage_rpc = { version = "0.7.0", path = "../rpc" } +async-trait = "0.1.7" base64 = "0.13" bytes = "1.0" chrono = "0.4" @@ -52,3 +54,6 @@ quick-xml = { version = "0.21", features = [ "serialize" ] } url = "2.1" opentelemetry = "0.17" + +[features] +k2v = [ "garage_util/k2v", "garage_model/k2v" ] diff --git a/src/api/api_server.rs b/src/api/api_server.rs deleted file mode 100644 index e7b86d9e..00000000 --- a/src/api/api_server.rs +++ /dev/null @@ -1,645 +0,0 @@ -use std::net::SocketAddr; -use std::sync::Arc; - -use chrono::{DateTime, NaiveDateTime, Utc}; -use futures::future::Future; -use futures::prelude::*; -use hyper::header; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Request, Response, Server}; - -use opentelemetry::{ - global, - metrics::{Counter, ValueRecorder}, - trace::{FutureExt, TraceContextExt, Tracer}, - Context, KeyValue, -}; - -use garage_util::data::*; -use garage_util::error::Error as GarageError; -use garage_util::metrics::{gen_trace_id, RecordDuration}; - -use garage_model::garage::Garage; -use garage_model::key_table::Key; - -use garage_table::util::*; - -use crate::error::*; -use crate::signature::compute_scope; -use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::SignedPayloadStream; -use crate::signature::LONG_DATETIME; - -use crate::helpers::*; -use crate::s3_bucket::*; -use crate::s3_copy::*; -use crate::s3_cors::*; -use crate::s3_delete::*; -use crate::s3_get::*; -use crate::s3_list::*; -use crate::s3_post_object::handle_post_object; -use crate::s3_put::*; -use crate::s3_router::{Authorization, Endpoint}; -use crate::s3_website::*; - -struct ApiMetrics { - request_counter: Counter<u64>, - error_counter: Counter<u64>, - request_duration: ValueRecorder<f64>, -} - -impl ApiMetrics { - fn new() -> Self { - let meter = global::meter("garage/api"); - Self { - request_counter: meter - .u64_counter("api.request_counter") - .with_description("Number of API calls to the various S3 API endpoints") - .init(), - error_counter: meter - .u64_counter("api.error_counter") - .with_description( - "Number of API calls to the various S3 API endpoints that resulted in errors", - ) - .init(), - request_duration: meter - .f64_value_recorder("api.request_duration") - .with_description("Duration of API calls to the various S3 API endpoints") - .init(), - } - } -} - -/// Run the S3 API server -pub async fn run_api_server( - garage: Arc<Garage>, - shutdown_signal: impl Future<Output = ()>, -) -> Result<(), GarageError> { - let addr = &garage.config.s3_api.api_bind_addr; - - let metrics = Arc::new(ApiMetrics::new()); - - let service = make_service_fn(|conn: &AddrStream| { - let garage = garage.clone(); - let metrics = metrics.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let garage = garage.clone(); - let metrics = metrics.clone(); - - handler(garage, metrics, req, client_addr) - })) - } - }); - - let server = Server::bind(addr).serve(service); - - let graceful = server.with_graceful_shutdown(shutdown_signal); - info!("API server listening on http://{}", addr); - - graceful.await?; - Ok(()) -} - -async fn handler( - garage: Arc<Garage>, - metrics: Arc<ApiMetrics>, - req: Request<Body>, - addr: SocketAddr, -) -> Result<Response<Body>, GarageError> { - let uri = req.uri().clone(); - info!("{} {} {}", addr, req.method(), uri); - debug!("{:?}", req); - - let tracer = opentelemetry::global::tracer("garage"); - let span = tracer - .span_builder("S3 API call (unknown)") - .with_trace_id(gen_trace_id()) - .with_attributes(vec![ - KeyValue::new("method", format!("{}", req.method())), - KeyValue::new("uri", req.uri().to_string()), - ]) - .start(&tracer); - - let res = handler_stage2(garage.clone(), metrics, req) - .with_context(Context::current_with_span(span)) - .await; - - match res { - Ok(x) => { - debug!("{} {:?}", x.status(), x.headers()); - Ok(x) - } - Err(e) => { - let body: Body = Body::from(e.aws_xml(&garage.config.s3_api.s3_region, uri.path())); - let mut http_error_builder = Response::builder() - .status(e.http_status_code()) - .header("Content-Type", "application/xml"); - - if let Some(header_map) = http_error_builder.headers_mut() { - e.add_headers(header_map) - } - - let http_error = http_error_builder.body(body)?; - - if e.http_status_code().is_server_error() { - warn!("Response: error {}, {}", e.http_status_code(), e); - } else { - info!("Response: error {}, {}", e.http_status_code(), e); - } - Ok(http_error) - } - } -} - -async fn handler_stage2( - garage: Arc<Garage>, - metrics: Arc<ApiMetrics>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { - let authority = req - .headers() - .get(header::HOST) - .ok_or_bad_request("Host header required")? - .to_str()?; - - let host = authority_to_host(authority)?; - - let bucket_name = garage - .config - .s3_api - .root_domain - .as_ref() - .and_then(|root_domain| host_to_bucket(&host, root_domain)); - - let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?; - debug!("Endpoint: {:?}", endpoint); - - let current_context = Context::current(); - let current_span = current_context.span(); - current_span.update_name::<String>(format!("S3 API {}", endpoint.name())); - current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); - current_span.set_attribute(KeyValue::new( - "bucket", - bucket_name.clone().unwrap_or_default(), - )); - - let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; - - let res = handler_stage3(garage, req, endpoint, bucket_name) - .record_duration(&metrics.request_duration, &metrics_tags[..]) - .await; - - metrics.request_counter.add(1, &metrics_tags[..]); - - let status_code = match &res { - Ok(r) => r.status(), - Err(e) => e.http_status_code(), - }; - if status_code.is_client_error() || status_code.is_server_error() { - metrics.error_counter.add( - 1, - &[ - metrics_tags[0].clone(), - KeyValue::new("status_code", status_code.as_str().to_string()), - ], - ); - } - - res -} - -async fn handler_stage3( - garage: Arc<Garage>, - req: Request<Body>, - endpoint: Endpoint, - bucket_name: Option<String>, -) -> Result<Response<Body>, Error> { - // Some endpoints are processed early, before we even check for an API key - if let Endpoint::PostObject = endpoint { - return handle_post_object(garage, req, bucket_name.unwrap()).await; - } - if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, bucket_name).await; - } - - let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?; - let api_key = api_key.ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })?; - - let req = match req.headers().get("x-amz-content-sha256") { - Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { - let signature = content_sha256 - .take() - .ok_or_bad_request("No signature provided")?; - - let secret_key = &api_key - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key; - - let date = req - .headers() - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime<Utc> = DateTime::from_utc(date, Utc); - - let scope = compute_scope(&date, &garage.config.s3_api.s3_region); - let signing_hmac = crate::signature::signing_hmac( - &date, - secret_key, - &garage.config.s3_api.s3_region, - "s3", - ) - .ok_or_internal_error("Unable to build signing HMAC")?; - - req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) - }) - } - _ => req, - }; - - let bucket_name = match bucket_name { - None => return handle_request_without_bucket(garage, req, api_key, endpoint).await, - Some(bucket) => bucket.to_string(), - }; - - // Special code path for CreateBucket API endpoint - if let Endpoint::CreateBucket {} = endpoint { - return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await; - } - - let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; - let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; - - let allowed = match endpoint.authorization_type() { - Authorization::Read => api_key.allow_read(&bucket_id), - Authorization::Write => api_key.allow_write(&bucket_id), - Authorization::Owner => api_key.allow_owner(&bucket_id), - _ => unreachable!(), - }; - - if !allowed { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); - } - - // Look up what CORS rule might apply to response. - // Requests for methods different than GET, HEAD or POST - // are always preflighted, i.e. the browser should make - // an OPTIONS call before to check it is allowed - let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, - _ => None, - }; - - let resp = match endpoint { - Endpoint::HeadObject { - key, part_number, .. - } => handle_head(garage, &req, bucket_id, &key, part_number).await, - Endpoint::GetObject { - key, part_number, .. - } => handle_get(garage, &req, bucket_id, &key, part_number).await, - Endpoint::UploadPart { - key, - part_number, - upload_id, - } => { - handle_put_part( - garage, - req, - bucket_id, - &key, - part_number, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CopyObject { key } => handle_copy(garage, &api_key, &req, bucket_id, &key).await, - Endpoint::UploadPartCopy { - key, - part_number, - upload_id, - } => { - handle_upload_part_copy( - garage, - &api_key, - &req, - bucket_id, - &key, - part_number, - &upload_id, - ) - .await - } - Endpoint::PutObject { key } => { - handle_put(garage, req, bucket_id, &key, content_sha256).await - } - Endpoint::AbortMultipartUpload { key, upload_id } => { - handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await - } - Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, - Endpoint::CreateMultipartUpload { key } => { - handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await - } - Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload( - garage, - req, - &bucket_name, - bucket_id, - &key, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CreateBucket {} => unreachable!(), - Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); - Ok(response) - } - Endpoint::DeleteBucket {} => { - handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await - } - Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), - Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), - Endpoint::ListObjects { - delimiter, - encoding_type, - marker, - max_keys, - prefix, - } => { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - is_v2: false, - marker, - continuation_token: None, - start_after: None, - }, - ) - .await - } - Endpoint::ListObjectsV2 { - delimiter, - encoding_type, - max_keys, - prefix, - continuation_token, - start_after, - list_type, - .. - } => { - if list_type == "2" { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - prefix: prefix.unwrap_or_default(), - }, - is_v2: true, - marker: None, - continuation_token, - start_after, - }, - ) - .await - } else { - Err(Error::BadRequest(format!( - "Invalid endpoint: list-type={}", - list_type - ))) - } - } - Endpoint::ListMultipartUploads { - delimiter, - encoding_type, - key_marker, - max_uploads, - prefix, - upload_id_marker, - } => { - handle_list_multipart_upload( - garage, - &ListMultipartUploadsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter: delimiter.map(|d| d.to_string()), - page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - key_marker, - upload_id_marker, - }, - ) - .await - } - Endpoint::ListParts { - key, - max_parts, - part_number_marker, - upload_id, - } => { - handle_list_parts( - garage, - &ListPartsQuery { - bucket_name, - bucket_id, - key, - upload_id, - part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), - max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), - }, - ) - .await - } - Endpoint::DeleteObjects {} => { - handle_delete_objects(garage, bucket_id, req, content_sha256).await - } - Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, - Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket_id, req, content_sha256).await - } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await, - Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, - Endpoint::PutBucketCors {} => handle_put_cors(garage, bucket_id, req, content_sha256).await, - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, - endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), - }; - - // If request was a success and we have a CORS rule that applies to it, - // add the corresponding CORS headers to the response - let mut resp_ok = resp?; - if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) - .ok_or_internal_error("Invalid bucket CORS configuration")?; - } - - Ok(resp_ok) -} - -async fn handle_request_without_bucket( - garage: Arc<Garage>, - _req: Request<Body>, - api_key: Key, - endpoint: Endpoint, -) -> Result<Response<Body>, Error> { - match endpoint { - Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await, - endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), - } -} - -#[allow(clippy::ptr_arg)] -pub async fn resolve_bucket( - garage: &Garage, - bucket_name: &String, - api_key: &Key, -) -> Result<Uuid, Error> { - let api_key_params = api_key - .state - .as_option() - .ok_or_internal_error("Key should not be deleted at this point")?; - - if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { - Ok(*bucket_id) - } else { - Ok(garage - .bucket_helper() - .resolve_global_bucket_name(bucket_name) - .await? - .ok_or(Error::NoSuchBucket)?) - } -} - -/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in -/// the host header of the request -/// -/// S3 internally manages only buckets and keys. This function splits -/// an HTTP path to get the corresponding bucket name and key. -pub fn parse_bucket_key<'a>( - path: &'a str, - host_bucket: Option<&'a str>, -) -> Result<(&'a str, Option<&'a str>), Error> { - let path = path.trim_start_matches('/'); - - if let Some(bucket) = host_bucket { - if !path.is_empty() { - return Ok((bucket, Some(path))); - } else { - return Ok((bucket, None)); - } - } - - let (bucket, key) = match path.find('/') { - Some(i) => { - let key = &path[i + 1..]; - if !key.is_empty() { - (&path[..i], Some(key)) - } else { - (&path[..i], None) - } - } - None => (path, None), - }; - if bucket.is_empty() { - return Err(Error::BadRequest("No bucket specified".to_string())); - } - Ok((bucket, key)) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_bucket_containing_a_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?; - assert_eq!(bucket, "my_bucket"); - assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); - Ok(()) - } - - #[test] - fn parse_bucket_containing_no_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/my_bucket/", None)?; - assert_eq!(bucket, "my_bucket"); - assert!(key.is_none()); - let (bucket, key) = parse_bucket_key("/my_bucket", None)?; - assert_eq!(bucket, "my_bucket"); - assert!(key.is_none()); - Ok(()) - } - - #[test] - fn parse_bucket_containing_no_bucket() { - let parsed = parse_bucket_key("", None); - assert!(parsed.is_err()); - let parsed = parse_bucket_key("/", None); - assert!(parsed.is_err()); - let parsed = parse_bucket_key("////", None); - assert!(parsed.is_err()); - } - - #[test] - fn parse_bucket_with_vhost_and_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); - Ok(()) - } - - #[test] - fn parse_bucket_with_vhost_no_key() -> Result<(), Error> { - let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert!(key.is_none()); - let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?; - assert_eq!(bucket, "my-bucket"); - assert!(key.is_none()); - Ok(()) - } -} diff --git a/src/api/error.rs b/src/api/error.rs index f53ed1fd..4b7254d2 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -7,7 +7,7 @@ use hyper::{HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; use garage_util::error::Error as GarageError; -use crate::s3_xml; +use crate::s3::xml as s3_xml; /// Errors of this crate #[derive(Debug, Error)] @@ -100,6 +100,10 @@ pub enum Error { #[error(display = "Bad request: {}", _0)] BadRequest(String), + /// The client asked for an invalid return format (invalid Accept header) + #[error(display = "Not acceptable: {}", _0)] + NotAcceptable(String), + /// The client sent a request for an action not supported by garage #[error(display = "Unimplemented action: {}", _0)] NotImplemented(String), @@ -140,6 +144,7 @@ impl Error { Error::BucketNotEmpty | Error::BucketAlreadyExists => StatusCode::CONFLICT, Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, Error::Forbidden(_) => StatusCode::FORBIDDEN, + Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::InternalError( GarageError::Timeout | GarageError::RemoteError(_) diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs new file mode 100644 index 00000000..9281e596 --- /dev/null +++ b/src/api/generic_server.rs @@ -0,0 +1,202 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; + +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; + +use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, + trace::{FutureExt, SpanRef, TraceContextExt, Tracer}, + Context, KeyValue, +}; + +use garage_util::error::Error as GarageError; +use garage_util::metrics::{gen_trace_id, RecordDuration}; + +use crate::error::*; + +pub(crate) trait ApiEndpoint: Send + Sync + 'static { + fn name(&self) -> &'static str; + fn add_span_attributes(&self, span: SpanRef<'_>); +} + +#[async_trait] +pub(crate) trait ApiHandler: Send + Sync + 'static { + const API_NAME: &'static str; + const API_NAME_DISPLAY: &'static str; + + type Endpoint: ApiEndpoint; + + fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Error>; + async fn handle( + &self, + req: Request<Body>, + endpoint: Self::Endpoint, + ) -> Result<Response<Body>, Error>; +} + +pub(crate) struct ApiServer<A: ApiHandler> { + region: String, + api_handler: A, + + // Metrics + request_counter: Counter<u64>, + error_counter: Counter<u64>, + request_duration: ValueRecorder<f64>, +} + +impl<A: ApiHandler> ApiServer<A> { + pub fn new(region: String, api_handler: A) -> Arc<Self> { + let meter = global::meter("garage/api"); + Arc::new(Self { + region, + api_handler, + request_counter: meter + .u64_counter(format!("api.{}.request_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + error_counter: meter + .u64_counter(format!("api.{}.error_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints that resulted in errors", + A::API_NAME_DISPLAY + )) + .init(), + request_duration: meter + .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME)) + .with_description(format!( + "Duration of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + }) + } + + pub async fn run_server( + self: Arc<Self>, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let service = make_service_fn(|conn: &AddrStream| { + let this = self.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { + let this = this.clone(); + + this.handler(req, client_addr) + })) + } + }); + + let server = Server::bind(&bind_addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!( + "{} API server listening on http://{}", + A::API_NAME_DISPLAY, + bind_addr + ); + + graceful.await?; + Ok(()) + } + + async fn handler( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, GarageError> { + let uri = req.uri().clone(); + info!("{} {} {}", addr, req.method(), uri); + debug!("{:?}", req); + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY)) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let res = self + .handler_stage2(req) + .with_context(Context::current_with_span(span)) + .await; + + match res { + Ok(x) => { + debug!("{} {:?}", x.status(), x.headers()); + Ok(x) + } + Err(e) => { + let body: Body = Body::from(e.aws_xml(&self.region, uri.path())); + let mut http_error_builder = Response::builder() + .status(e.http_status_code()) + .header("Content-Type", "application/xml"); + + if let Some(header_map) = http_error_builder.headers_mut() { + e.add_headers(header_map) + } + + let http_error = http_error_builder.body(body)?; + + if e.http_status_code().is_server_error() { + warn!("Response: error {}, {}", e.http_status_code(), e); + } else { + info!("Response: error {}, {}", e.http_status_code(), e); + } + Ok(http_error) + } + } + } + + async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, Error> { + let endpoint = self.api_handler.parse_endpoint(&req)?; + debug!("Endpoint: {}", endpoint.name()); + + let current_context = Context::current(); + let current_span = current_context.span(); + current_span.update_name::<String>(format!("S3 API {}", endpoint.name())); + current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); + endpoint.add_span_attributes(current_span); + + let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; + + let res = self + .api_handler + .handle(req, endpoint) + .record_duration(&self.request_duration, &metrics_tags[..]) + .await; + + self.request_counter.add(1, &metrics_tags[..]); + + let status_code = match &res { + Ok(r) => r.status(), + Err(e) => e.http_status_code(), + }; + if status_code.is_client_error() || status_code.is_server_error() { + self.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", status_code.as_str().to_string()), + ], + ); + } + + res + } +} diff --git a/src/api/helpers.rs b/src/api/helpers.rs index c2709bb3..a994b82f 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,6 +1,25 @@ -use crate::Error; use idna::domain_to_unicode; +use garage_util::data::*; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; + +use crate::error::*; + +/// What kind of authorization is required to perform a given action +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Authorization { + /// No authorization is required + None, + /// Having Read permission on bucket + Read, + /// Having Write permission on bucket + Write, + /// Having Owner permission on bucket + Owner, +} + /// Host to bucket /// /// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket", @@ -60,11 +79,143 @@ pub fn authority_to_host(authority: &str) -> Result<String, Error> { authority.map(|h| domain_to_unicode(h).0) } +#[allow(clippy::ptr_arg)] +pub async fn resolve_bucket( + garage: &Garage, + bucket_name: &String, + api_key: &Key, +) -> Result<Uuid, Error> { + let api_key_params = api_key + .state + .as_option() + .ok_or_internal_error("Key should not be deleted at this point")?; + + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + Ok(*bucket_id) + } else { + Ok(garage + .bucket_helper() + .resolve_global_bucket_name(bucket_name) + .await? + .ok_or(Error::NoSuchBucket)?) + } +} + +/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in +/// the host header of the request +/// +/// S3 internally manages only buckets and keys. This function splits +/// an HTTP path to get the corresponding bucket name and key. +pub fn parse_bucket_key<'a>( + path: &'a str, + host_bucket: Option<&'a str>, +) -> Result<(&'a str, Option<&'a str>), Error> { + let path = path.trim_start_matches('/'); + + if let Some(bucket) = host_bucket { + if !path.is_empty() { + return Ok((bucket, Some(path))); + } else { + return Ok((bucket, None)); + } + } + + let (bucket, key) = match path.find('/') { + Some(i) => { + let key = &path[i + 1..]; + if !key.is_empty() { + (&path[..i], Some(key)) + } else { + (&path[..i], None) + } + } + None => (path, None), + }; + if bucket.is_empty() { + return Err(Error::BadRequest("No bucket specified".to_string())); + } + Ok((bucket, key)) +} + +const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; + +/// Compute the key after the prefix +pub fn key_after_prefix(pfx: &str) -> Option<String> { + let mut next = pfx.to_string(); + while !next.is_empty() { + let tail = next.pop().unwrap(); + if tail >= char::MAX { + continue; + } + + // Circumvent a limitation of RangeFrom that overflow earlier than needed + // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html + let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { + char::MAX + } else { + (tail..).nth(1).unwrap() + }; + + next.push(new_tail); + return Some(next); + } + + None +} + #[cfg(test)] mod tests { use super::*; #[test] + fn parse_bucket_containing_a_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/my_bucket/a/super/file.jpg", None)?; + assert_eq!(bucket, "my_bucket"); + assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); + Ok(()) + } + + #[test] + fn parse_bucket_containing_no_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/my_bucket/", None)?; + assert_eq!(bucket, "my_bucket"); + assert!(key.is_none()); + let (bucket, key) = parse_bucket_key("/my_bucket", None)?; + assert_eq!(bucket, "my_bucket"); + assert!(key.is_none()); + Ok(()) + } + + #[test] + fn parse_bucket_containing_no_bucket() { + let parsed = parse_bucket_key("", None); + assert!(parsed.is_err()); + let parsed = parse_bucket_key("/", None); + assert!(parsed.is_err()); + let parsed = parse_bucket_key("////", None); + assert!(parsed.is_err()); + } + + #[test] + fn parse_bucket_with_vhost_and_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("/a/super/file.jpg", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert_eq!(key.expect("key must be set"), "a/super/file.jpg"); + Ok(()) + } + + #[test] + fn parse_bucket_with_vhost_no_key() -> Result<(), Error> { + let (bucket, key) = parse_bucket_key("", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert!(key.is_none()); + let (bucket, key) = parse_bucket_key("/", Some("my-bucket"))?; + assert_eq!(bucket, "my-bucket"); + assert!(key.is_none()); + Ok(()) + } + + #[test] fn authority_to_host_with_port() -> Result<(), Error> { let domain = authority_to_host("[::1]:3902")?; assert_eq!(domain, "[::1]"); @@ -111,4 +262,39 @@ mod tests { assert_eq!(host_to_bucket("not-garage.tld", "garage.tld"), None); assert_eq!(host_to_bucket("not-garage.tld", ".garage.tld"), None); } + + #[test] + fn test_key_after_prefix() { + use std::iter::FromIterator; + + assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); + assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); + assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); + assert_eq!( + key_after_prefix("").unwrap().as_str(), + String::from(char::from_u32(0x10FFFE).unwrap()) + ); + + // When the last character is the biggest UTF8 char + let a = String::from_iter(['a', char::MAX].iter()); + assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); + + // When all characters are the biggest UTF8 char + let b = String::from_iter([char::MAX; 3].iter()); + assert!(key_after_prefix(b.as_str()).is_none()); + + // Check utf8 surrogates + let c = String::from('\u{D7FF}'); + assert_eq!( + key_after_prefix(c.as_str()).unwrap().as_str(), + String::from('\u{E000}') + ); + + // Check the character before the biggest one + let d = String::from('\u{10FFFE}'); + assert_eq!( + key_after_prefix(d.as_str()).unwrap().as_str(), + String::from(char::MAX) + ); + } } diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs new file mode 100644 index 00000000..5f5e9030 --- /dev/null +++ b/src/api/k2v/api_server.rs @@ -0,0 +1,195 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use hyper::{Body, Method, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_table::util::*; +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; + +use crate::error::*; +use crate::generic_server::*; + +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; + +use crate::helpers::*; +use crate::k2v::batch::*; +use crate::k2v::index::*; +use crate::k2v::item::*; +use crate::k2v::router::Endpoint; +use crate::s3::cors::*; + +pub struct K2VApiServer { + garage: Arc<Garage>, +} + +pub(crate) struct K2VApiEndpoint { + bucket_name: String, + endpoint: Endpoint, +} + +impl K2VApiServer { + pub async fn run( + garage: Arc<Garage>, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + if let Some(cfg) = &garage.config.k2v_api { + let bind_addr = cfg.api_bind_addr; + + ApiServer::new( + garage.config.s3_api.s3_region.clone(), + K2VApiServer { garage }, + ) + .run_server(bind_addr, shutdown_signal) + .await + } else { + Ok(()) + } + } +} + +#[async_trait] +impl ApiHandler for K2VApiServer { + const API_NAME: &'static str = "k2v"; + const API_NAME_DISPLAY: &'static str = "K2V"; + + type Endpoint = K2VApiEndpoint; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { + let (endpoint, bucket_name) = Endpoint::from_request(req)?; + + Ok(K2VApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: K2VApiEndpoint, + ) -> Result<Response<Body>, Error> { + let K2VApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // The OPTIONS method is procesed early, before we even check for an API key + if let Endpoint::Options = endpoint { + return handle_options_s3api(garage, &req, Some(bucket_name)).await; + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; + let api_key = api_key.ok_or_else(|| { + Error::Forbidden("Garage does not support anonymous access yet".to_string()) + })?; + + let req = parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + "k2v", + )?; + + let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or(Error::NoSuchBucket)?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); + } + + // Look up what CORS rule might apply to response. + // Requests for methods different than GET, HEAD or POST + // are always preflighted, i.e. the browser should make + // an OPTIONS call before to check it is allowed + let matching_cors_rule = match *req.method() { + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + _ => None, + }; + + let resp = match endpoint { + Endpoint::DeleteItem { + partition_key, + sort_key, + } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::InsertItem { + partition_key, + sort_key, + } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::ReadItem { + partition_key, + sort_key, + } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + Endpoint::PollItem { + partition_key, + sort_key, + causality_token, + timeout, + } => { + handle_poll_item( + garage, + &req, + bucket_id, + partition_key, + sort_key, + causality_token, + timeout, + ) + .await + } + Endpoint::ReadIndex { + prefix, + start, + end, + limit, + reverse, + } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, + Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::Options => unreachable!(), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for K2VApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone())); + } +} diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs new file mode 100644 index 00000000..4ecddeb9 --- /dev/null +++ b/src/api/k2v/batch.rs @@ -0,0 +1,368 @@ +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::{EnumerationOrder, TableSchema}; + +use garage_model::garage::Garage; +use garage_model::k2v::causality::*; +use garage_model::k2v::item_table::*; + +use crate::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_insert_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let items: Vec<InsertBatchItem> = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let mut items2 = vec![]; + for it in items { + let ct = it + .ct + .map(|s| CausalContext::parse(&s)) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + let v = match it.v { + Some(vs) => { + DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) + } + None => DvvsValue::Deleted, + }; + items2.push((it.pk, it.sk, ct, v)); + } + + garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_read_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let queries: Vec<ReadBatchQuery> = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let resp_results = futures::future::join_all( + queries + .into_iter() + .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + ) + .await; + + let mut resps: Vec<ReadBatchResponse> = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_read_batch_query( + garage: &Arc<Garage>, + bucket_id: Uuid, + query: ReadBatchQuery, +) -> Result<ReadBatchResponse, Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: !query.tombstones, + conflicts_only: query.conflicts_only, + }; + + let (items, more, next_start) = if query.single_item { + if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse { + return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into())); + } + let sk = query + .start + .as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v + .item_table + .get(&partition, sk) + .await? + .filter(|e| K2VItemTable::matches_filter(e, &filter)); + match item { + Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None), + None => (vec![], false, None), + } + } else { + let (items, more, next_start) = read_range( + &garage.k2v.item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + query.limit, + Some(filter), + EnumerationOrder::from_reverse(query.reverse), + ) + .await?; + + let items = items + .into_iter() + .map(ReadBatchResponseItem::from) + .collect::<Vec<_>>(); + + (items, more, next_start) + }; + + Ok(ReadBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + limit: query.limit, + reverse: query.reverse, + single_item: query.single_item, + conflicts_only: query.conflicts_only, + tombstones: query.tombstones, + items, + more, + next_start, + }) +} + +pub async fn handle_delete_batch( + garage: Arc<Garage>, + bucket_id: Uuid, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + let queries: Vec<DeleteBatchQuery> = + serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; + + let resp_results = futures::future::join_all( + queries + .into_iter() + .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + ) + .await; + + let mut resps: Vec<DeleteBatchResponse> = vec![]; + for resp in resp_results { + resps.push(resp?); + } + + let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +async fn handle_delete_batch_query( + garage: &Arc<Garage>, + bucket_id: Uuid, + query: DeleteBatchQuery, +) -> Result<DeleteBatchResponse, Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key: query.partition_key.clone(), + }; + + let filter = ItemFilter { + exclude_only_tombstones: true, + conflicts_only: false, + }; + + let deleted_items = if query.single_item { + if query.prefix.is_some() || query.end.is_some() { + return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into())); + } + let sk = query + .start + .as_ref() + .ok_or_bad_request("start should be specified if single_item is set")?; + let item = garage + .k2v + .item_table + .get(&partition, sk) + .await? + .filter(|e| K2VItemTable::matches_filter(e, &filter)); + match item { + Some(i) => { + let cc = i.causal_context(); + garage + .k2v + .rpc + .insert( + bucket_id, + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + .await?; + 1 + } + None => 0, + } + } else { + let (items, more, _next_start) = read_range( + &garage.k2v.item_table, + &partition, + &query.prefix, + &query.start, + &query.end, + None, + Some(filter), + EnumerationOrder::Forward, + ) + .await?; + assert!(!more); + + // TODO delete items + let items = items + .into_iter() + .map(|i| { + let cc = i.causal_context(); + ( + i.partition.partition_key, + i.sort_key, + Some(cc), + DvvsValue::Deleted, + ) + }) + .collect::<Vec<_>>(); + let n = items.len(); + + garage.k2v.rpc.insert_batch(bucket_id, items).await?; + + n + }; + + Ok(DeleteBatchResponse { + partition_key: query.partition_key, + prefix: query.prefix, + start: query.start, + end: query.end, + single_item: query.single_item, + deleted_items, + }) +} + +#[derive(Deserialize)] +struct InsertBatchItem { + pk: String, + sk: String, + ct: Option<String>, + v: Option<String>, +} + +#[derive(Deserialize)] +struct ReadBatchQuery { + #[serde(rename = "partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default)] + limit: Option<u64>, + #[serde(default)] + reverse: bool, + #[serde(default, rename = "singleItem")] + single_item: bool, + #[serde(default, rename = "conflictsOnly")] + conflicts_only: bool, + #[serde(default)] + tombstones: bool, +} + +#[derive(Serialize)] +struct ReadBatchResponse { + #[serde(rename = "partitionKey")] + partition_key: String, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: bool, + #[serde(rename = "singleItem")] + single_item: bool, + #[serde(rename = "conflictsOnly")] + conflicts_only: bool, + tombstones: bool, + + items: Vec<ReadBatchResponseItem>, + more: bool, + #[serde(rename = "nextStart")] + next_start: Option<String>, +} + +#[derive(Serialize)] +struct ReadBatchResponseItem { + sk: String, + ct: String, + v: Vec<Option<String>>, +} + +impl ReadBatchResponseItem { + fn from(i: K2VItem) -> Self { + let ct = i.causal_context().serialize(); + let v = i + .values() + .iter() + .map(|v| match v { + DvvsValue::Value(x) => Some(base64::encode(x)), + DvvsValue::Deleted => None, + }) + .collect::<Vec<_>>(); + Self { + sk: i.sort_key, + ct, + v, + } + } +} + +#[derive(Deserialize)] +struct DeleteBatchQuery { + #[serde(rename = "partitionKey")] + partition_key: String, + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default, rename = "singleItem")] + single_item: bool, +} + +#[derive(Serialize)] +struct DeleteBatchResponse { + #[serde(rename = "partitionKey")] + partition_key: String, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + #[serde(rename = "singleItem")] + single_item: bool, + + #[serde(rename = "deletedItems")] + deleted_items: usize, +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs new file mode 100644 index 00000000..896dbcf0 --- /dev/null +++ b/src/api/k2v/index.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use hyper::{Body, Response, StatusCode}; +use serde::Serialize; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_rpc::ring::Ring; +use garage_table::util::*; + +use garage_model::garage::Garage; +use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; + +use crate::error::*; +use crate::k2v::range::read_range; + +pub async fn handle_read_index( + garage: Arc<Garage>, + bucket_id: Uuid, + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: Option<bool>, +) -> Result<Response<Body>, Error> { + let reverse = reverse.unwrap_or(false); + + let ring: Arc<Ring> = garage.system.ring.borrow().clone(); + + let (partition_keys, more, next_start) = read_range( + &garage.k2v.counter_table.table, + &bucket_id, + &prefix, + &start, + &end, + limit, + Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + EnumerationOrder::from_reverse(reverse), + ) + .await?; + + let s_entries = ENTRIES.to_string(); + let s_conflicts = CONFLICTS.to_string(); + let s_values = VALUES.to_string(); + let s_bytes = BYTES.to_string(); + + let resp = ReadIndexResponse { + prefix, + start, + end, + limit, + reverse, + partition_keys: partition_keys + .into_iter() + .map(|part| { + let vals = part.filtered_values(&ring); + ReadIndexResponseEntry { + pk: part.sk, + entries: *vals.get(&s_entries).unwrap_or(&0), + conflicts: *vals.get(&s_conflicts).unwrap_or(&0), + values: *vals.get(&s_values).unwrap_or(&0), + bytes: *vals.get(&s_bytes).unwrap_or(&0), + } + }) + .collect::<Vec<_>>(), + more, + next_start, + }; + + let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + +#[derive(Serialize)] +struct ReadIndexResponse { + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: bool, + + #[serde(rename = "partitionKeys")] + partition_keys: Vec<ReadIndexResponseEntry>, + + more: bool, + #[serde(rename = "nextStart")] + next_start: Option<String>, +} + +#[derive(Serialize)] +struct ReadIndexResponseEntry { + pk: String, + entries: i64, + conflicts: i64, + values: i64, + bytes: i64, +} diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs new file mode 100644 index 00000000..1860863e --- /dev/null +++ b/src/api/k2v/item.rs @@ -0,0 +1,230 @@ +use std::sync::Arc; + +use http::header; + +use hyper::{Body, Request, Response, StatusCode}; + +use garage_util::data::*; + +use garage_model::garage::Garage; +use garage_model::k2v::causality::*; +use garage_model::k2v::item_table::*; + +use crate::error::*; + +pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; + +pub enum ReturnFormat { + Json, + Binary, + Either, +} + +impl ReturnFormat { + pub fn from(req: &Request<Body>) -> Result<Self, Error> { + let accept = match req.headers().get(header::ACCEPT) { + Some(a) => a.to_str()?, + None => return Ok(Self::Json), + }; + + let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>(); + let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*"); + let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*"); + + match (accept_json, accept_binary) { + (true, true) => Ok(Self::Either), + (true, false) => Ok(Self::Json), + (false, true) => Ok(Self::Binary), + (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())), + } + } + + pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { + let vals = item.values(); + + if vals.is_empty() { + return Err(Error::NoSuchKey); + } + + let ct = item.causal_context().serialize(); + match self { + Self::Binary if vals.len() > 1 => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .status(StatusCode::CONFLICT) + .body(Body::empty())?), + Self::Binary => { + assert!(vals.len() == 1); + Self::make_binary_response(ct, vals[0]) + } + Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]), + _ => Self::make_json_response(ct, &vals[..]), + } + } + + fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { + match v { + DvvsValue::Deleted => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?), + DvvsValue::Value(v) => Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/octet-stream") + .status(StatusCode::OK) + .body(Body::from(v.to_vec()))?), + } + } + + fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { + let items = v + .iter() + .map(|v| match v { + DvvsValue::Deleted => serde_json::Value::Null, + DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)), + }) + .collect::<Vec<_>>(); + let json_body = + serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?; + Ok(Response::builder() + .header(X_GARAGE_CAUSALITY_TOKEN, ct) + .header(header::CONTENT_TYPE, "application/json") + .status(StatusCode::OK) + .body(Body::from(json_body))?) + } +} + +/// Handle ReadItem request +#[allow(clippy::ptr_arg)] +pub async fn handle_read_item( + garage: Arc<Garage>, + req: &Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &String, +) -> Result<Response<Body>, Error> { + let format = ReturnFormat::from(req)?; + + let item = garage + .k2v + .item_table + .get( + &K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + sort_key, + ) + .await? + .ok_or(Error::NoSuchKey)?; + + format.make_response(&item) +} + +pub async fn handle_insert_item( + garage: Arc<Garage>, + req: Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &str, +) -> Result<Response<Body>, Error> { + let causal_context = req + .headers() + .get(X_GARAGE_CAUSALITY_TOKEN) + .map(|s| s.to_str()) + .transpose()? + .map(CausalContext::parse) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + + let body = hyper::body::to_bytes(req.into_body()).await?; + let value = DvvsValue::Value(body.to_vec()); + + garage + .k2v + .rpc + .insert( + bucket_id, + partition_key.to_string(), + sort_key.to_string(), + causal_context, + value, + ) + .await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +pub async fn handle_delete_item( + garage: Arc<Garage>, + req: Request<Body>, + bucket_id: Uuid, + partition_key: &str, + sort_key: &str, +) -> Result<Response<Body>, Error> { + let causal_context = req + .headers() + .get(X_GARAGE_CAUSALITY_TOKEN) + .map(|s| s.to_str()) + .transpose()? + .map(CausalContext::parse) + .transpose() + .ok_or_bad_request("Invalid causality token")?; + + let value = DvvsValue::Deleted; + + garage + .k2v + .rpc + .insert( + bucket_id, + partition_key.to_string(), + sort_key.to_string(), + causal_context, + value, + ) + .await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +/// Handle ReadItem request +#[allow(clippy::ptr_arg)] +pub async fn handle_poll_item( + garage: Arc<Garage>, + req: &Request<Body>, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causality_token: String, + timeout_secs: Option<u64>, +) -> Result<Response<Body>, Error> { + let format = ReturnFormat::from(req)?; + + let causal_context = + CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + + let item = garage + .k2v + .rpc + .poll( + bucket_id, + partition_key, + sort_key, + causal_context, + timeout_secs.unwrap_or(300) * 1000, + ) + .await?; + + if let Some(item) = item { + format.make_response(&item) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs new file mode 100644 index 00000000..ee210ad5 --- /dev/null +++ b/src/api/k2v/mod.rs @@ -0,0 +1,8 @@ +pub mod api_server; +mod router; + +mod batch; +mod index; +mod item; + +mod range; diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs new file mode 100644 index 00000000..cd019723 --- /dev/null +++ b/src/api/k2v/range.rs @@ -0,0 +1,96 @@ +//! Utility module for retrieving ranges of items in Garage tables +//! Implements parameters (prefix, start, end, limit) as specified +//! for endpoints ReadIndex, ReadBatch and DeleteBatch + +use std::sync::Arc; + +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::error::*; +use crate::helpers::key_after_prefix; + +/// Read range in a Garage table. +/// Returns (entries, more?, nextStart) +#[allow(clippy::too_many_arguments)] +pub(crate) async fn read_range<F>( + table: &Arc<Table<F, TableShardedReplication>>, + partition_key: &F::P, + prefix: &Option<String>, + start: &Option<String>, + end: &Option<String>, + limit: Option<u64>, + filter: Option<F::Filter>, + enumeration_order: EnumerationOrder, +) -> Result<(Vec<F::E>, bool, Option<String>), Error> +where + F: TableSchema<S = String> + 'static, +{ + let (mut start, mut start_ignore) = match (prefix, start) { + (None, None) => (None, false), + (None, Some(s)) => (Some(s.clone()), false), + (Some(p), Some(s)) => { + if !s.starts_with(p) { + return Err(Error::BadRequest(format!( + "Start key '{}' does not start with prefix '{}'", + s, p + ))); + } + (Some(s.clone()), false) + } + (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => { + let start = key_after_prefix(p) + .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?; + (Some(start), true) + } + (Some(p), None) => (Some(p.clone()), false), + }; + + let mut entries = vec![]; + loop { + let n_get = std::cmp::min( + 1000, + limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, + ); + let get_ret = table + .get_range( + partition_key, + start.clone(), + filter.clone(), + n_get, + enumeration_order, + ) + .await?; + + let get_ret_len = get_ret.len(); + + for entry in get_ret { + if start_ignore && Some(entry.sort_key()) == start.as_ref() { + continue; + } + if let Some(p) = prefix { + if !entry.sort_key().starts_with(p) { + return Ok((entries, false, None)); + } + } + if let Some(e) = end { + if entry.sort_key() == e { + return Ok((entries, false, None)); + } + } + if let Some(l) = limit { + if entries.len() >= l as usize { + return Ok((entries, true, Some(entry.sort_key().clone()))); + } + } + entries.push(entry); + } + + if get_ret_len < n_get { + return Ok((entries, false, None)); + } + + start = Some(entries.last().unwrap().sort_key().clone()); + start_ignore = true; + } +} diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs new file mode 100644 index 00000000..f948ffce --- /dev/null +++ b/src/api/k2v/router.rs @@ -0,0 +1,252 @@ +use crate::error::*; + +use std::borrow::Cow; + +use hyper::{Method, Request}; + +use crate::helpers::Authorization; +use crate::router_macros::{generateQueryParameters, router_match}; + +router_match! {@func + + +/// List of all K2V API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + DeleteBatch { + }, + DeleteItem { + partition_key: String, + sort_key: String, + }, + InsertBatch { + }, + InsertItem { + partition_key: String, + sort_key: String, + }, + Options, + PollItem { + partition_key: String, + sort_key: String, + causality_token: String, + timeout: Option<u64>, + }, + ReadBatch { + }, + ReadIndex { + prefix: Option<String>, + start: Option<String>, + end: Option<String>, + limit: Option<u64>, + reverse: Option<bool>, + }, + ReadItem { + partition_key: String, + sort_key: String, + }, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> { + let uri = req.uri(); + let path = uri.path().trim_start_matches('/'); + let query = uri.query(); + + let (bucket, partition_key) = path + .split_once('/') + .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/'))) + .unwrap_or((path.to_owned(), "")); + + if bucket.is_empty() { + return Err(Error::BadRequest("Missing bucket name".to_owned())); + } + + if *req.method() == Method::OPTIONS { + return Ok((Self::Options, bucket)); + } + + let partition_key = percent_encoding::percent_decode_str(partition_key) + .decode_utf8()? + .into_owned(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let method_search = Method::from_bytes(b"SEARCH").unwrap(); + let res = match *req.method() { + Method::GET => Self::from_get(partition_key, &mut query)?, + //&Method::HEAD => Self::from_head(partition_key, &mut query)?, + Method::POST => Self::from_post(partition_key, &mut query)?, + Method::PUT => Self::from_put(partition_key, &mut query)?, + Method::DELETE => Self::from_delete(partition_key, &mut query)?, + _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?, + _ => return Err(Error::BadRequest("Unknown method".to_owned())), + }; + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + Ok((res, bucket)) + } + + /// Determine which endpoint a request is for, knowing it is a GET. + fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout), + EMPTY => ReadItem (query::sort_key), + ], + no_key: [ + EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse), + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a SEARCH. + fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => ReadBatch, + ] + } + } + + /* + /// Determine which endpoint a request is for, knowing it is a HEAD. + fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id), + ], + no_key: [ + EMPTY => HeadBucket, + ] + } + } + */ + + /// Determine which endpoint a request is for, knowing it is a POST. + fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => InsertBatch, + DELETE => DeleteBatch, + SEARCH => ReadBatch, + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a PUT. + fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => InsertItem (query::sort_key), + + ], + no_key: [ + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a DELETE. + fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => DeleteItem (query::sort_key), + ], + no_key: [ + ] + } + } + + /// Get the partition key the request target. Returns None for requests which don't use a partition key. + #[allow(dead_code)] + pub fn get_partition_key(&self) -> Option<&str> { + router_match! { + @extract + self, + partition_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the sort key the request target. Returns None for requests which don't use a sort key. + #[allow(dead_code)] + pub fn get_sort_key(&self) -> Option<&str> { + router_match! { + @extract + self, + sort_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + let readonly = router_match! { + @match + self, + [ + PollItem, + ReadBatch, + ReadIndex, + ReadItem, + ] + }; + if readonly { + Authorization::Read + } else { + Authorization::Write + } + } +} + +// parameter name => struct field +generateQueryParameters! { + "prefix" => prefix, + "start" => start, + "causality_token" => causality_token, + "end" => end, + "limit" => limit, + "reverse" => reverse, + "sort_key" => sort_key, + "timeout" => timeout +} + +mod keywords { + //! This module contain all query parameters with no associated value + //! used to differentiate endpoints. + pub const EMPTY: &str = ""; + + pub const DELETE: &str = "delete"; + pub const SEARCH: &str = "search"; +} diff --git a/src/api/lib.rs b/src/api/lib.rs index de60ec53..0078f7b5 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -6,22 +6,12 @@ pub mod error; pub use error::Error; mod encoding; - -mod api_server; -pub use api_server::run_api_server; - +mod generic_server; +pub mod helpers; +mod router_macros; /// This mode is public only to help testing. Don't expect stability here pub mod signature; -pub mod helpers; -mod s3_bucket; -mod s3_copy; -pub mod s3_cors; -mod s3_delete; -pub mod s3_get; -mod s3_list; -mod s3_post_object; -mod s3_put; -mod s3_router; -mod s3_website; -mod s3_xml; +#[cfg(feature = "k2v")] +pub mod k2v; +pub mod s3; diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs new file mode 100644 index 00000000..8471407c --- /dev/null +++ b/src/api/router_macros.rs @@ -0,0 +1,190 @@ +/// This macro is used to generate very repetitive match {} blocks in this module +/// It is _not_ made to be used anywhere else +macro_rules! router_match { + (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ + // usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } + // returns true if the variant was one of the listed variants, false otherwise. + use Endpoint::*; + match $enum { + $( + $endpoint { .. } => true, + )* + _ => false + } + }}; + (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{ + // usage: router_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] } + // returns Some(field_value), or None if the variant was not one of the listed variants. + use Endpoint::*; + match $enum { + $( + $endpoint {$param, ..} => Some($param), + )* + _ => None + } + }}; + (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr), + key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], + no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ + // usage: router_match {@gen_parser (keyword, key, query, header), + // key: [ + // SOME_KEYWORD => VariantWithKey, + // ... + // ], + // no_key: [ + // SOME_KEYWORD => VariantWithoutKey, + // ... + // ] + // } + // See in from_{method} for more detailed usage. + use Endpoint::*; + use keywords::*; + match ($keyword, !$key.is_empty()){ + $( + ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k { + $key, + $($( + $param_k: router_match!(@@parse_param $query, $conv_k, $param_k), + )*)? + }), + )* + $( + ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk { + $($( + $param_nk: router_match!(@@parse_param $query, $conv_nk, $param_nk), + )*)? + }), + )* + (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw))) + } + }}; + + (@@parse_param $query:expr, query_opt, $param:ident) => {{ + // extract optional query parameter + $query.$param.take().map(|param| param.into_owned()) + }}; + (@@parse_param $query:expr, query, $param:ident) => {{ + // extract mendatory query parameter + $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned() + }}; + (@@parse_param $query:expr, opt_parse, $param:ident) => {{ + // extract and parse optional query parameter + // missing parameter is file, however parse error is reported as an error + $query.$param + .take() + .map(|param| param.parse()) + .transpose() + .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? + }}; + (@@parse_param $query:expr, parse, $param:ident) => {{ + // extract and parse mandatory query parameter + // both missing and un-parseable parameters are reported as errors + $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? + .parse() + .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? + }}; + (@func + $(#[$doc:meta])* + pub enum Endpoint { + $( + $(#[$outer:meta])* + $variant:ident $({ + $($name:ident: $ty:ty,)* + })?, + )* + }) => { + $(#[$doc])* + pub enum Endpoint { + $( + $(#[$outer])* + $variant $({ + $($name: $ty, )* + })?, + )* + } + impl Endpoint { + pub fn name(&self) -> &'static str { + match self { + $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)* + } + } + } + }; + (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => { + $($then)* + }; + (@if () then ($($then:tt)*) else ($($else:tt)*)) => { + $($else)* + }; +} + +/// This macro is used to generate part of the code in this module. It must be called only one, and +/// is useless outside of this module. +macro_rules! generateQueryParameters { + ( $($rest:expr => $name:ident),* ) => { + /// Struct containing all query parameters used in endpoints. Think of it as an HashMap, + /// but with keys statically known. + #[derive(Debug, Default)] + struct QueryParameters<'a> { + keyword: Option<Cow<'a, str>>, + $( + $name: Option<Cow<'a, str>>, + )* + } + + impl<'a> QueryParameters<'a> { + /// Build this struct from the query part of an URI. + fn from_query(query: &'a str) -> Result<Self, Error> { + let mut res: Self = Default::default(); + for (k, v) in url::form_urlencoded::parse(query.as_bytes()) { + let repeated = match k.as_ref() { + $( + $rest => if !v.is_empty() { + res.$name.replace(v).is_some() + } else { + false + }, + )* + _ => { + if k.starts_with("response-") || k.starts_with("X-Amz-") { + false + } else if v.as_ref().is_empty() { + if res.keyword.replace(k).is_some() { + return Err(Error::BadRequest("Multiple keywords".to_owned())); + } + continue; + } else { + debug!("Received an unknown query parameter: '{}'", k); + false + } + } + }; + if repeated { + return Err(Error::BadRequest(format!( + "Query parameter repeated: '{}'", + k + ))); + } + } + Ok(res) + } + + /// Get an error message in case not all parameters where used when extracting them to + /// build an Enpoint variant + fn nonempty_message(&self) -> Option<&str> { + if self.keyword.is_some() { + Some("Keyword not used") + } $( + else if self.$name.is_some() { + Some(concat!("'", $rest, "'")) + } + )* else { + None + } + } + } + } +} + +pub(crate) use generateQueryParameters; +pub(crate) use router_match; diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs new file mode 100644 index 00000000..78a69d53 --- /dev/null +++ b/src/api/s3/api_server.rs @@ -0,0 +1,401 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use hyper::header; +use hyper::{Body, Method, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_table::util::*; +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; + +use crate::error::*; +use crate::generic_server::*; + +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; + +use crate::helpers::*; +use crate::s3::bucket::*; +use crate::s3::copy::*; +use crate::s3::cors::*; +use crate::s3::delete::*; +use crate::s3::get::*; +use crate::s3::list::*; +use crate::s3::post_object::handle_post_object; +use crate::s3::put::*; +use crate::s3::router::Endpoint; +use crate::s3::website::*; + +pub struct S3ApiServer { + garage: Arc<Garage>, +} + +pub(crate) struct S3ApiEndpoint { + bucket_name: Option<String>, + endpoint: Endpoint, +} + +impl S3ApiServer { + pub async fn run( + garage: Arc<Garage>, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let addr = garage.config.s3_api.api_bind_addr; + + ApiServer::new( + garage.config.s3_api.s3_region.clone(), + S3ApiServer { garage }, + ) + .run_server(addr, shutdown_signal) + .await + } + + async fn handle_request_without_bucket( + &self, + _req: Request<Body>, + api_key: Key, + endpoint: Endpoint, + ) -> Result<Response<Body>, Error> { + match endpoint { + Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + } + } +} + +#[async_trait] +impl ApiHandler for S3ApiServer { + const API_NAME: &'static str = "s3"; + const API_NAME_DISPLAY: &'static str = "S3"; + + type Endpoint = S3ApiEndpoint; + + fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { + let authority = req + .headers() + .get(header::HOST) + .ok_or_bad_request("Host header required")? + .to_str()?; + + let host = authority_to_host(authority)?; + + let bucket_name = self + .garage + .config + .s3_api + .root_domain + .as_ref() + .and_then(|root_domain| host_to_bucket(&host, root_domain)); + + let (endpoint, bucket_name) = + Endpoint::from_request(req, bucket_name.map(ToOwned::to_owned))?; + + Ok(S3ApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request<Body>, + endpoint: S3ApiEndpoint, + ) -> Result<Response<Body>, Error> { + let S3ApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // Some endpoints are processed early, before we even check for an API key + if let Endpoint::PostObject = endpoint { + return handle_post_object(garage, req, bucket_name.unwrap()).await; + } + if let Endpoint::Options = endpoint { + return handle_options_s3api(garage, &req, bucket_name).await; + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; + let api_key = api_key.ok_or_else(|| { + Error::Forbidden("Garage does not support anonymous access yet".to_string()) + })?; + + let req = parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + "s3", + )?; + + let bucket_name = match bucket_name { + None => { + return self + .handle_request_without_bucket(req, api_key, endpoint) + .await + } + Some(bucket) => bucket.to_string(), + }; + + // Special code path for CreateBucket API endpoint + if let Endpoint::CreateBucket {} = endpoint { + return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await; + } + + let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or(Error::NoSuchBucket)?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); + } + + // Look up what CORS rule might apply to response. + // Requests for methods different than GET, HEAD or POST + // are always preflighted, i.e. the browser should make + // an OPTIONS call before to check it is allowed + let matching_cors_rule = match *req.method() { + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + _ => None, + }; + + let resp = match endpoint { + Endpoint::HeadObject { + key, part_number, .. + } => handle_head(garage, &req, bucket_id, &key, part_number).await, + Endpoint::GetObject { + key, part_number, .. + } => handle_get(garage, &req, bucket_id, &key, part_number).await, + Endpoint::UploadPart { + key, + part_number, + upload_id, + } => { + handle_put_part( + garage, + req, + bucket_id, + &key, + part_number, + &upload_id, + content_sha256, + ) + .await + } + Endpoint::CopyObject { key } => { + handle_copy(garage, &api_key, &req, bucket_id, &key).await + } + Endpoint::UploadPartCopy { + key, + part_number, + upload_id, + } => { + handle_upload_part_copy( + garage, + &api_key, + &req, + bucket_id, + &key, + part_number, + &upload_id, + ) + .await + } + Endpoint::PutObject { key } => { + handle_put(garage, req, bucket_id, &key, content_sha256).await + } + Endpoint::AbortMultipartUpload { key, upload_id } => { + handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + } + Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, + Endpoint::CreateMultipartUpload { key } => { + handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await + } + Endpoint::CompleteMultipartUpload { key, upload_id } => { + handle_complete_multipart_upload( + garage, + req, + &bucket_name, + bucket_id, + &key, + &upload_id, + content_sha256, + ) + .await + } + Endpoint::CreateBucket {} => unreachable!(), + Endpoint::HeadBucket {} => { + let empty_body: Body = Body::from(vec![]); + let response = Response::builder().body(empty_body).unwrap(); + Ok(response) + } + Endpoint::DeleteBucket {} => { + handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await + } + Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), + Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), + Endpoint::ListObjects { + delimiter, + encoding_type, + marker, + max_keys, + prefix, + } => { + handle_list( + garage, + &ListObjectsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + }, + is_v2: false, + marker, + continuation_token: None, + start_after: None, + }, + ) + .await + } + Endpoint::ListObjectsV2 { + delimiter, + encoding_type, + max_keys, + prefix, + continuation_token, + start_after, + list_type, + .. + } => { + if list_type == "2" { + handle_list( + garage, + &ListObjectsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + prefix: prefix.unwrap_or_default(), + }, + is_v2: true, + marker: None, + continuation_token, + start_after, + }, + ) + .await + } else { + Err(Error::BadRequest(format!( + "Invalid endpoint: list-type={}", + list_type + ))) + } + } + Endpoint::ListMultipartUploads { + delimiter, + encoding_type, + key_marker, + max_uploads, + prefix, + upload_id_marker, + } => { + handle_list_multipart_upload( + garage, + &ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + }, + key_marker, + upload_id_marker, + }, + ) + .await + } + Endpoint::ListParts { + key, + max_parts, + part_number_marker, + upload_id, + } => { + handle_list_parts( + garage, + &ListPartsQuery { + bucket_name, + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), + max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + }, + ) + .await + } + Endpoint::DeleteObjects {} => { + handle_delete_objects(garage, bucket_id, req, content_sha256).await + } + Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, + Endpoint::PutBucketWebsite {} => { + handle_put_website(garage, bucket_id, req, content_sha256).await + } + Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await, + Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, + Endpoint::PutBucketCors {} => { + handle_put_cors(garage, bucket_id, req, content_sha256).await + } + Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for S3ApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new( + "bucket", + self.bucket_name.clone().unwrap_or_default(), + )); + } +} diff --git a/src/api/s3_bucket.rs b/src/api/s3/bucket.rs index 8a5407d3..93048a8c 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3/bucket.rs @@ -7,15 +7,15 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::key_table::Key; -use garage_model::object_table::ObjectFilter; use garage_model::permission::BucketKeyPerm; +use garage_model::s3::object_table::ObjectFilter; use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::time::*; use crate::error::*; -use crate::s3_xml; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { @@ -230,7 +230,13 @@ pub async fn handle_delete_bucket( // Check bucket is empty let objects = garage .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) .await?; if !objects.is_empty() { return Err(Error::BucketNotEmpty); diff --git a/src/api/s3_copy.rs b/src/api/s3/copy.rs index fc4707e2..4e94d887 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3/copy.rs @@ -12,16 +12,16 @@ use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::block_ref_table::*; use garage_model::garage::Garage; use garage_model::key_table::Key; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; -use crate::api_server::{parse_bucket_key, resolve_bucket}; use crate::error::*; -use crate::s3_put::{decode_upload_id, get_headers}; -use crate::s3_xml::{self, xmlns_tag}; +use crate::helpers::{parse_bucket_key, resolve_bucket}; +use crate::s3::put::{decode_upload_id, get_headers}; +use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc<Garage>, @@ -619,7 +619,7 @@ pub struct CopyPartResult { #[cfg(test)] mod tests { use super::*; - use crate::s3_xml::to_xml_with_header; + use crate::s3::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { diff --git a/src/api/s3_cors.rs b/src/api/s3/cors.rs index ab77e23a..37ea2e43 100644 --- a/src/api/s3_cors.rs +++ b/src/api/s3/cors.rs @@ -10,7 +10,7 @@ use hyper::{header::HeaderName, Body, Method, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use crate::error::*; -use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; +use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; diff --git a/src/api/s3_delete.rs b/src/api/s3/delete.rs index b243d982..1e3f1249 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3/delete.rs @@ -6,10 +6,10 @@ use garage_util::data::*; use garage_util::time::*; use garage_model::garage::Garage; -use garage_model::object_table::*; +use garage_model::s3::object_table::*; use crate::error::*; -use crate::s3_xml; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; async fn handle_delete_internal( diff --git a/src/api/s3_get.rs b/src/api/s3/get.rs index 7f647e15..3edf22a6 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3/get.rs @@ -14,8 +14,8 @@ use garage_table::EmptyKey; use garage_util::data::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; use crate::error::*; diff --git a/src/api/s3_list.rs b/src/api/s3/list.rs index 5852fc1b..e2848c57 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3/list.rs @@ -10,15 +10,16 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::Version; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::Version; -use garage_table::EmptyKey; +use garage_table::{EmptyKey, EnumerationOrder}; use crate::encoding::*; use crate::error::*; -use crate::s3_put; -use crate::s3_xml; +use crate::helpers::key_after_prefix; +use crate::s3::put as s3_put; +use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; const DUMMY_KEY: &str = "GKDummyKey"; @@ -66,8 +67,14 @@ pub async fn handle_list( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsData), + count, + EnumerationOrder::Forward, + ) + .await } }; @@ -165,8 +172,14 @@ pub async fn handle_list_multipart_upload( let io = |bucket, key, count| { let t = &garage.object_table; async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) - .await + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsUploading), + count, + EnumerationOrder::Forward, + ) + .await } }; @@ -923,39 +936,13 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { } } -const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; - -/// Compute the key after the prefix -fn key_after_prefix(pfx: &str) -> Option<String> { - let mut next = pfx.to_string(); - while !next.is_empty() { - let tail = next.pop().unwrap(); - if tail >= char::MAX { - continue; - } - - // Circumvent a limitation of RangeFrom that overflow earlier than needed - // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html - let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { - char::MAX - } else { - (tail..).nth(1).unwrap() - }; - - next.push(new_tail); - return Some(next); - } - - None -} - /* * Unit tests of this module */ #[cfg(test)] mod tests { use super::*; - use garage_model::version_table::*; + use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; @@ -1003,39 +990,6 @@ mod tests { } #[test] - fn test_key_after_prefix() { - assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); - assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); - assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); - assert_eq!( - key_after_prefix("").unwrap().as_str(), - String::from(char::from_u32(0x10FFFE).unwrap()) - ); - - // When the last character is the biggest UTF8 char - let a = String::from_iter(['a', char::MAX].iter()); - assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); - - // When all characters are the biggest UTF8 char - let b = String::from_iter([char::MAX; 3].iter()); - assert!(key_after_prefix(b.as_str()).is_none()); - - // Check utf8 surrogates - let c = String::from('\u{D7FF}'); - assert_eq!( - key_after_prefix(c.as_str()).unwrap().as_str(), - String::from('\u{E000}') - ); - - // Check the character before the biggest one - let d = String::from('\u{10FFFE}'); - assert_eq!( - key_after_prefix(d.as_str()).unwrap().as_str(), - String::from(char::MAX) - ); - } - - #[test] fn test_common_prefixes() { let mut query = query(); let objs = objs(); diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs new file mode 100644 index 00000000..3f5c1915 --- /dev/null +++ b/src/api/s3/mod.rs @@ -0,0 +1,14 @@ +pub mod api_server; + +mod bucket; +mod copy; +pub mod cors; +mod delete; +pub mod get; +mod list; +mod post_object; +mod put; +mod website; + +mod router; +pub mod xml; diff --git a/src/api/s3_post_object.rs b/src/api/s3/post_object.rs index 585e0304..86fa7880 100644 --- a/src/api/s3_post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,10 +14,10 @@ use serde::Deserialize; use garage_model::garage::Garage; -use crate::api_server::resolve_bucket; use crate::error::*; -use crate::s3_put::{get_headers, save_stream}; -use crate::s3_xml; +use crate::helpers::resolve_bucket; +use crate::s3::put::{get_headers, save_stream}; +use crate::s3::xml as s3_xml; use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( @@ -119,7 +119,15 @@ pub async fn handle_post_object( }; let date = parse_date(date)?; - let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?; + let api_key = verify_v4( + &garage, + "s3", + credential, + &date, + signature, + policy.as_bytes(), + ) + .await?; let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?; diff --git a/src/api/s3_put.rs b/src/api/s3/put.rs index ed0bf00b..89aa8d84 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3/put.rs @@ -14,13 +14,13 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::block_ref_table::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; use crate::error::*; -use crate::s3_xml; +use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; pub async fn handle_put( diff --git a/src/api/s3_router.rs b/src/api/s3/router.rs index 95a7eceb..0525c649 100644 --- a/src/api/s3_router.rs +++ b/src/api/s3/router.rs @@ -5,127 +5,10 @@ use std::borrow::Cow; use hyper::header::HeaderValue; use hyper::{HeaderMap, Method, Request}; -/// This macro is used to generate very repetitive match {} blocks in this module -/// It is _not_ made to be used anywhere else -macro_rules! s3_match { - (@match $enum:expr , [ $($endpoint:ident,)* ]) => {{ - // usage: s3_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] } - // returns true if the variant was one of the listed variants, false otherwise. - use Endpoint::*; - match $enum { - $( - $endpoint { .. } => true, - )* - _ => false - } - }}; - (@extract $enum:expr , $param:ident, [ $($endpoint:ident,)* ]) => {{ - // usage: s3_match {@extract my_enum, field_name, [ VariantWithField1, VariantWithField2 ..] } - // returns Some(field_value), or None if the variant was not one of the listed variants. - use Endpoint::*; - match $enum { - $( - $endpoint {$param, ..} => Some($param), - )* - _ => None - } - }}; - (@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr), - key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], - no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ - // usage: s3_match {@gen_parser (keyword, key, query, header), - // key: [ - // SOME_KEYWORD => VariantWithKey, - // ... - // ], - // no_key: [ - // SOME_KEYWORD => VariantWithoutKey, - // ... - // ] - // } - // See in from_{method} for more detailed usage. - use Endpoint::*; - use keywords::*; - match ($keyword, !$key.is_empty()){ - $( - ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k { - key: $key, - $($( - $param_k: s3_match!(@@parse_param $query, $conv_k, $param_k), - )*)? - }), - )* - $( - ($kw_nk, false) $(if $query.$required_nk.is_some())? $(if $header.contains($header_nk))? => Ok($api_nk { - $($( - $param_nk: s3_match!(@@parse_param $query, $conv_nk, $param_nk), - )*)? - }), - )* - (kw, _) => Err(Error::BadRequest(format!("Invalid endpoint: {}", kw))) - } - }}; - - (@@parse_param $query:expr, query_opt, $param:ident) => {{ - // extract optional query parameter - $query.$param.take().map(|param| param.into_owned()) - }}; - (@@parse_param $query:expr, query, $param:ident) => {{ - // extract mendatory query parameter - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned() - }}; - (@@parse_param $query:expr, opt_parse, $param:ident) => {{ - // extract and parse optional query parameter - // missing parameter is file, however parse error is reported as an error - $query.$param - .take() - .map(|param| param.parse()) - .transpose() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? - }}; - (@@parse_param $query:expr, parse, $param:ident) => {{ - // extract and parse mandatory query parameter - // both missing and un-parseable parameters are reported as errors - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? - .parse() - .map_err(|_| Error::BadRequest("Failed to parse query parameter".to_owned()))? - }}; - (@func - $(#[$doc:meta])* - pub enum Endpoint { - $( - $(#[$outer:meta])* - $variant:ident $({ - $($name:ident: $ty:ty,)* - })?, - )* - }) => { - $(#[$doc])* - pub enum Endpoint { - $( - $(#[$outer])* - $variant $({ - $($name: $ty, )* - })?, - )* - } - impl Endpoint { - pub fn name(&self) -> &'static str { - match self { - $(Endpoint::$variant $({ $($name: _,)* .. })? => stringify!($variant),)* - } - } - } - }; - (@if ($($cond:tt)+) then ($($then:tt)*) else ($($else:tt)*)) => { - $($then)* - }; - (@if () then ($($then:tt)*) else ($($else:tt)*)) => { - $($else)* - }; -} +use crate::helpers::Authorization; +use crate::router_macros::{generateQueryParameters, router_match}; -s3_match! {@func +router_match! {@func /// List of all S3 API endpoints. /// @@ -471,7 +354,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a GET. fn from_get(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -528,7 +411,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a HEAD. fn from_head(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -542,7 +425,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a POST. fn from_post(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -564,7 +447,7 @@ impl Endpoint { query: &mut QueryParameters<'_>, headers: &HeaderMap<HeaderValue>, ) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, headers), key: [ @@ -606,7 +489,7 @@ impl Endpoint { /// Determine which endpoint a request is for, knowing it is a DELETE. fn from_delete(key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> { - s3_match! { + router_match! { @gen_parser (query.keyword.take().unwrap_or_default().as_ref(), key, query, None), key: [ @@ -636,7 +519,7 @@ impl Endpoint { /// Get the key the request target. Returns None for requests which don't use a key. #[allow(dead_code)] pub fn get_key(&self) -> Option<&str> { - s3_match! { + router_match! { @extract self, key, @@ -673,7 +556,7 @@ impl Endpoint { if let Endpoint::ListBuckets = self { return Authorization::None; }; - let readonly = s3_match! { + let readonly = router_match! { @match self, [ @@ -717,7 +600,7 @@ impl Endpoint { SelectObjectContent, ] }; - let owner = s3_match! { + let owner = router_match! { @match self, [ @@ -740,87 +623,6 @@ impl Endpoint { } } -/// What kind of authorization is required to perform a given action -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Authorization { - /// No authorization is required - None, - /// Having Read permission on bucket - Read, - /// Having Write permission on bucket - Write, - /// Having Owner permission on bucket - Owner, -} - -/// This macro is used to generate part of the code in this module. It must be called only one, and -/// is useless outside of this module. -macro_rules! generateQueryParameters { - ( $($rest:expr => $name:ident),* ) => { - /// Struct containing all query parameters used in endpoints. Think of it as an HashMap, - /// but with keys statically known. - #[derive(Debug, Default)] - struct QueryParameters<'a> { - keyword: Option<Cow<'a, str>>, - $( - $name: Option<Cow<'a, str>>, - )* - } - - impl<'a> QueryParameters<'a> { - /// Build this struct from the query part of an URI. - fn from_query(query: &'a str) -> Result<Self, Error> { - let mut res: Self = Default::default(); - for (k, v) in url::form_urlencoded::parse(query.as_bytes()) { - let repeated = match k.as_ref() { - $( - $rest => if !v.is_empty() { - res.$name.replace(v).is_some() - } else { - false - }, - )* - _ => { - if k.starts_with("response-") || k.starts_with("X-Amz-") { - false - } else if v.as_ref().is_empty() { - if res.keyword.replace(k).is_some() { - return Err(Error::BadRequest("Multiple keywords".to_owned())); - } - continue; - } else { - debug!("Received an unknown query parameter: '{}'", k); - false - } - } - }; - if repeated { - return Err(Error::BadRequest(format!( - "Query parameter repeated: '{}'", - k - ))); - } - } - Ok(res) - } - - /// Get an error message in case not all parameters where used when extracting them to - /// build an Enpoint variant - fn nonempty_message(&self) -> Option<&str> { - if self.keyword.is_some() { - Some("Keyword not used") - } $( - else if self.$name.is_some() { - Some(concat!("'", $rest, "'")) - } - )* else { - None - } - } - } - } -} - // parameter name => struct field generateQueryParameters! { "continuation-token" => continuation_token, diff --git a/src/api/s3_website.rs b/src/api/s3/website.rs index b464dd45..561130dc 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3/website.rs @@ -5,7 +5,7 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use crate::error::*; -use crate::s3_xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; +use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; diff --git a/src/api/s3_xml.rs b/src/api/s3/xml.rs index 75ec4559..75ec4559 100644 --- a/src/api/s3_xml.rs +++ b/src/api/s3/xml.rs diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index ebdee6da..5646f4fa 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -42,6 +42,11 @@ pub fn signing_hmac( Ok(hmac) } -pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String { - format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,) +pub fn compute_scope(datetime: &DateTime<Utc>, region: &str, service: &str) -> String { + format!( + "{}/{}/{}/aws4_request", + datetime.format(SHORT_DATE), + region, + service + ) } diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index 2a41b307..9137dd2d 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -11,14 +11,15 @@ use garage_util::data::Hash; use garage_model::garage::Garage; use garage_model::key_table::*; -use super::signing_hmac; -use super::{LONG_DATETIME, SHORT_DATE}; +use super::LONG_DATETIME; +use super::{compute_scope, signing_hmac}; use crate::encoding::uri_encode; use crate::error::*; pub async fn check_payload_signature( garage: &Garage, + service: &str, request: &Request<Body>, ) -> Result<(Option<Key>, Option<Hash>), Error> { let mut headers = HashMap::new(); @@ -64,6 +65,7 @@ pub async fn check_payload_signature( let key = verify_v4( garage, + service, &authorization.credential, &authorization.date, &authorization.signature, @@ -281,6 +283,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { pub async fn verify_v4( garage: &Garage, + service: &str, credential: &str, date: &DateTime<Utc>, signature: &str, @@ -288,11 +291,7 @@ pub async fn verify_v4( ) -> Result<Key, Error> { let (key_id, scope) = parse_credential(credential)?; - let scope_expected = format!( - "{}/{}/s3/aws4_request", - date.format(SHORT_DATE), - garage.config.s3_api.s3_region - ); + let scope_expected = compute_scope(date, &garage.config.s3_api.s3_region, service); if scope != scope_expected { return Err(Error::AuthorizationHeaderMalformed(scope.to_string())); } @@ -309,7 +308,7 @@ pub async fn verify_v4( date, &key_p.secret_key, &garage.config.s3_api.s3_region, - "s3", + service, ) .ok_or_internal_error("Unable to build signing HMAC")?; hmac.update(payload); diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 969a45d6..ded9d993 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,19 +1,68 @@ use std::pin::Pin; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use futures::prelude::*; use futures::task; +use garage_model::key_table::Key; +use hmac::Mac; use hyper::body::Bytes; +use hyper::{Body, Request}; use garage_util::data::Hash; -use hmac::Mac; -use super::sha256sum; -use super::HmacSha256; -use super::LONG_DATETIME; +use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; use crate::error::*; +pub fn parse_streaming_body( + api_key: &Key, + req: Request<Body>, + content_sha256: &mut Option<Hash>, + region: &str, + service: &str, +) -> Result<Request<Body>, Error> { + match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let signature = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = req + .headers() + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime<Utc> = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, region, service); + let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Ok(req.map(move |body| { + Body::wrap_stream( + SignedPayloadStream::new( + body.map_err(Error::from), + signing_hmac, + date, + &scope, + signature, + ) + .map_err(Error::from), + ) + })) + } + _ => Ok(req), + } +} + /// Result of `sha256("")` const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; @@ -295,7 +344,7 @@ mod tests { .with_timezone(&Utc); let secret_key = "test"; let region = "test"; - let scope = crate::signature::compute_scope(&datetime, region); + let scope = crate::signature::compute_scope(&datetime, region, "s3"); let signing_hmac = crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap(); diff --git a/src/block/manager.rs b/src/block/manager.rs index 1c04a335..9b2d9cad 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -132,7 +132,7 @@ impl BlockManager { let endpoint = system .netapp - .endpoint("garage_model/block.rs/Rpc".to_string()); + .endpoint("garage_block/manager.rs/Rpc".to_string()); let manager_locked = BlockManagerLocked(); diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 59f402ff..3b69d7bc 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -63,3 +63,11 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime"] } sha2 = "0.9" static_init = "1.0" +assert-json-diff = "2.0" +serde_json = "1.0" +base64 = "0.13" + + +[features] +kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] +k2v = [ "garage_util/k2v", "garage_api/k2v" ] diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 0b20bb20..af0c3f22 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -21,8 +21,8 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; -use garage_model::object_table::ObjectFilter; use garage_model::permission::*; +use garage_model::s3::object_table::ObjectFilter; use crate::cli::*; use crate::repair::Repair; @@ -80,7 +80,13 @@ impl AdminRpcHandler { let buckets = self .garage .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) .await?; Ok(AdminRpc::BucketList(buckets)) } @@ -210,7 +216,13 @@ impl AdminRpcHandler { let objects = self .garage .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) .await?; if !objects.is_empty() { return Err(Error::BadRequest(format!( @@ -445,6 +457,7 @@ impl AdminRpcHandler { None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000, + EnumerationOrder::Forward, ) .await? .iter() diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index a90277a0..2a799868 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -85,13 +85,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> format_table(healthy_nodes); let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|adv| !adv.is_up); + let failure_case_1 = status + .iter() + .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); let failure_case_2 = layout .roles .items() .iter() - .filter(|(_, _, v)| v.0.is_some()) - .any(|(id, _, _)| !status_keys.contains(id)); + .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some()); if failure_case_1 || failure_case_2 { println!("\n==== FAILED NODES ===="); let mut failed_nodes = diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 3666ca8f..830eac71 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use tokio::sync::watch; -use garage_model::block_ref_table::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; use garage_table::*; use garage_util::error::Error; diff --git a/src/garage/server.rs b/src/garage/server.rs index 58c9e782..24bb25b3 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -8,10 +8,13 @@ use garage_util::error::Error; use garage_admin::metrics::*; use garage_admin::tracing_setup::*; -use garage_api::run_api_server; +use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::run_web_server; +#[cfg(feature = "k2v")] +use garage_api::k2v::api_server::K2VApiServer; + use crate::admin::*; async fn wait_from(mut chan: watch::Receiver<bool>) { @@ -56,12 +59,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Create admin RPC handler..."); AdminRpcHandler::new(garage.clone()); - info!("Initializing API server..."); - let api_server = tokio::spawn(run_api_server( + info!("Initializing S3 API server..."); + let s3_api_server = tokio::spawn(S3ApiServer::run( garage.clone(), wait_from(watch_cancel.clone()), )); + #[cfg(feature = "k2v")] + let k2v_api_server = { + info!("Initializing K2V API server..."); + tokio::spawn(K2VApiServer::run( + garage.clone(), + wait_from(watch_cancel.clone()), + )) + }; + info!("Initializing web server..."); let web_server = tokio::spawn(run_web_server( garage.clone(), @@ -80,8 +92,12 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Stuff runs // When a cancel signal is sent, stuff stops - if let Err(e) = api_server.await? { - warn!("API server exited with error: {}", e); + if let Err(e) = s3_api_server.await? { + warn!("S3 API server exited with error: {}", e); + } + #[cfg(feature = "k2v")] + if let Err(e) = k2v_api_server.await? { + warn!("K2V API server exited with error: {}", e); } if let Err(e) = web_server.await? { warn!("Web server exited with error: {}", e); diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs index c5ddc6e5..212588b5 100644 --- a/src/garage/tests/common/client.rs +++ b/src/garage/tests/common/client.rs @@ -10,7 +10,7 @@ pub fn build_client(instance: &Instance) -> Client { None, "garage-integ-test", ); - let endpoint = Endpoint::immutable(instance.uri()); + let endpoint = Endpoint::immutable(instance.s3_uri()); let config = Config::builder() .region(super::REGION) diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 580691a1..1700cc90 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -17,14 +17,25 @@ use garage_api::signature; pub struct CustomRequester { key: Key, uri: Uri, + service: &'static str, client: Client<HttpConnector>, } impl CustomRequester { - pub fn new(instance: &Instance) -> Self { + pub fn new_s3(instance: &Instance) -> Self { CustomRequester { key: instance.key.clone(), - uri: instance.uri(), + uri: instance.s3_uri(), + service: "s3", + client: Client::new(), + } + } + + pub fn new_k2v(instance: &Instance) -> Self { + CustomRequester { + key: instance.key.clone(), + uri: instance.k2v_uri(), + service: "k2v", client: Client::new(), } } @@ -32,6 +43,7 @@ impl CustomRequester { pub fn builder(&self, bucket: String) -> RequestBuilder<'_> { RequestBuilder { requester: self, + service: self.service, bucket, method: Method::GET, path: String::new(), @@ -47,6 +59,7 @@ impl CustomRequester { pub struct RequestBuilder<'a> { requester: &'a CustomRequester, + service: &'static str, bucket: String, method: Method, path: String, @@ -59,13 +72,17 @@ pub struct RequestBuilder<'a> { } impl<'a> RequestBuilder<'a> { + pub fn service(&mut self, service: &'static str) -> &mut Self { + self.service = service; + self + } pub fn method(&mut self, method: Method) -> &mut Self { self.method = method; self } - pub fn path(&mut self, path: String) -> &mut Self { - self.path = path; + pub fn path(&mut self, path: impl ToString) -> &mut Self { + self.path = path.to_string(); self } @@ -74,16 +91,38 @@ impl<'a> RequestBuilder<'a> { self } + pub fn query_param<T, U>(&mut self, param: T, value: Option<U>) -> &mut Self + where + T: ToString, + U: ToString, + { + self.query_params + .insert(param.to_string(), value.as_ref().map(ToString::to_string)); + self + } + pub fn signed_headers(&mut self, signed_headers: HashMap<String, String>) -> &mut Self { self.signed_headers = signed_headers; self } + pub fn signed_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self { + self.signed_headers + .insert(name.to_string(), value.to_string()); + self + } + pub fn unsigned_headers(&mut self, unsigned_headers: HashMap<String, String>) -> &mut Self { self.unsigned_headers = unsigned_headers; self } + pub fn unsigned_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self { + self.unsigned_headers + .insert(name.to_string(), value.to_string()); + self + } + pub fn body(&mut self, body: Vec<u8>) -> &mut Self { self.body = body; self @@ -106,24 +145,24 @@ impl<'a> RequestBuilder<'a> { let query = query_param_to_string(&self.query_params); let (host, path) = if self.vhost_style { ( - format!("{}.s3.garage", self.bucket), + format!("{}.{}.garage", self.bucket, self.service), format!("{}{}", self.path, query), ) } else { ( - "s3.garage".to_owned(), + format!("{}.garage", self.service), format!("{}/{}{}", self.bucket, self.path, query), ) }; let uri = format!("{}{}", self.requester.uri, path); let now = Utc::now(); - let scope = signature::compute_scope(&now, super::REGION.as_ref()); + let scope = signature::compute_scope(&now, super::REGION.as_ref(), self.service); let mut signer = signature::signing_hmac( &now, &self.requester.key.secret, super::REGION.as_ref(), - "s3", + self.service, ) .unwrap(); let streaming_signer = signer.clone(); diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index 88c51501..44d727f9 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -22,7 +22,9 @@ pub struct Instance { process: process::Child, pub path: PathBuf, pub key: Key, - pub api_port: u16, + pub s3_port: u16, + pub k2v_port: u16, + pub web_port: u16, } impl Instance { @@ -58,9 +60,12 @@ rpc_secret = "{secret}" [s3_api] s3_region = "{region}" -api_bind_addr = "127.0.0.1:{api_port}" +api_bind_addr = "127.0.0.1:{s3_port}" root_domain = ".s3.garage" +[k2v_api] +api_bind_addr = "127.0.0.1:{k2v_port}" + [s3_web] bind_addr = "127.0.0.1:{web_port}" root_domain = ".web.garage" @@ -72,10 +77,11 @@ api_bind_addr = "127.0.0.1:{admin_port}" path = path.display(), secret = GARAGE_TEST_SECRET, region = super::REGION, - api_port = port, - rpc_port = port + 1, - web_port = port + 2, - admin_port = port + 3, + s3_port = port, + k2v_port = port + 1, + rpc_port = port + 2, + web_port = port + 3, + admin_port = port + 4, ); fs::write(path.join("config.toml"), config).expect("Could not write garage config file"); @@ -88,7 +94,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" .arg("server") .stdout(stdout) .stderr(stderr) - .env("RUST_LOG", "garage=info,garage_api=debug") + .env("RUST_LOG", "garage=info,garage_api=trace") .spawn() .expect("Could not start garage"); @@ -96,7 +102,9 @@ api_bind_addr = "127.0.0.1:{admin_port}" process: child, path, key: Key::default(), - api_port: port, + s3_port: port, + k2v_port: port + 1, + web_port: port + 3, } } @@ -147,8 +155,14 @@ api_bind_addr = "127.0.0.1:{admin_port}" String::from_utf8(output.stdout).unwrap() } - pub fn uri(&self) -> http::Uri { - format!("http://127.0.0.1:{api_port}", api_port = self.api_port) + pub fn s3_uri(&self) -> http::Uri { + format!("http://127.0.0.1:{s3_port}", s3_port = self.s3_port) + .parse() + .expect("Could not build garage endpoint URI") + } + + pub fn k2v_uri(&self) -> http::Uri { + format!("http://127.0.0.1:{k2v_port}", k2v_port = self.k2v_port) .parse() .expect("Could not build garage endpoint URI") } diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs index 8f88c731..28874b02 100644 --- a/src/garage/tests/common/mod.rs +++ b/src/garage/tests/common/mod.rs @@ -17,18 +17,27 @@ pub struct Context { pub garage: &'static garage::Instance, pub client: Client, pub custom_request: CustomRequester, + pub k2v: K2VContext, +} + +pub struct K2VContext { + pub request: CustomRequester, } impl Context { fn new() -> Self { let garage = garage::instance(); let client = client::build_client(garage); - let custom_request = CustomRequester::new(garage); + let custom_request = CustomRequester::new_s3(garage); + let k2v_request = CustomRequester::new_k2v(garage); Context { garage, client, custom_request, + k2v: K2VContext { + request: k2v_request, + }, } } diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs new file mode 100644 index 00000000..1182a298 --- /dev/null +++ b/src/garage/tests/k2v/batch.rs @@ -0,0 +1,525 @@ +use std::collections::HashMap; + +use crate::common; + +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; +use hyper::Method; + +#[tokio::test] +async fn test_batch() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-batch"); + + let mut values = HashMap::new(); + values.insert("a", "initial test 1"); + values.insert("b", "initial test 2"); + values.insert("c", "initial test 3"); + values.insert("d.1", "initial test 4"); + values.insert("d.2", "initial test 5"); + values.insert("e", "initial test 6"); + let mut ct = HashMap::new(); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + format!( + r#"[ + {{"pk": "root", "sk": "a", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "b", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.1", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}} + ]"#, + base64::encode(values.get(&"a").unwrap()), + base64::encode(values.get(&"b").unwrap()), + base64::encode(values.get(&"c").unwrap()), + base64::encode(values.get(&"d.1").unwrap()), + base64::encode(values.get(&"d.2").unwrap()), + base64::encode(values.get(&"e").unwrap()), + ) + .into_bytes(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + for sk in ["a", "b", "c", "d.1", "d.2", "e"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, values.get(sk).unwrap().as_bytes()); + } + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "start": "c"}, + {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"}, + {"partitionKey": "root", "limit": 1}, + {"partitionKey": "root", "prefix": "d"} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "a", + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]} + ], + "more": true, + "nextStart": "b", + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + ]) + ); + + // Insert some new values + values.insert("c'", "new test 3"); + values.insert("d.1'", "new test 4"); + values.insert("d.2'", "new test 5"); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + format!( + r#"[ + {{"pk": "root", "sk": "b", "ct": "{}", "v": null}}, + {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.1", "ct": "{}", "v": "{}"}}, + {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}} + ]"#, + ct.get(&"b").unwrap(), + base64::encode(values.get(&"c'").unwrap()), + ct.get(&"d.1").unwrap(), + base64::encode(values.get(&"d.1'").unwrap()), + base64::encode(values.get(&"d.2'").unwrap()), + ) + .into_bytes(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + for sk in ["b", "c", "d.1", "d.2"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + if sk == "b" { + assert_eq!(res.status(), 204); + } else { + assert_eq!(res.status(), 200); + } + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + } + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "prefix": "d"}, + {"partitionKey": "root", "prefix": "d.", "end": "d.2"}, + {"partitionKey": "root", "prefix": "d.", "limit": 1}, + {"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1}, + {"partitionKey": "root", "prefix": "d.", "reverse": true}, + {"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true}, + {"partitionKey": "root", "prefix": "d.", "limit": 2} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": "d.2", + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": true, + "nextStart": "d.2", + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": "d.2", + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": "d.2", + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": 2, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + ]) + ); + + // Test DeleteBatch + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("delete", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root", "start": "a", "end": "c"}, + {"partitionKey": "root", "prefix": "d"} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": "a", + "end": "c", + "singleItem": false, + "deletedItems": 1, + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "singleItem": false, + "deletedItems": 2, + }, + ]) + ); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "reverse": true} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + ]) + ); +} diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs new file mode 100644 index 00000000..2fcc45bc --- /dev/null +++ b/src/garage/tests/k2v/errorcodes.rs @@ -0,0 +1,141 @@ +use crate::common; + +use hyper::Method; + +#[tokio::test] +async fn test_error_codes() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-error-codes"); + + // Regular insert should work (code 200) + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Insert with trash causality token: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", "tra$sh") + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search without partition key: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {}, + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search with start that is not in prefix: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partition_key": "root", "prefix": "a", "start": "bx"}, + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search with invalid json: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partition_key": "root" + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Batch insert with invalid causality token: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + br#"[ + {"pk": "root", "sk": "a", "ct": "tra$h", "v": "aGVsbG8sIHdvcmxkCg=="} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Batch insert with invalid data: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + br#"[ + {"pk": "root", "sk": "a", "ct": null, "v": "aGVsbG8sIHdvcmx$Cg=="} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Poll with invalid causality token: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .query_param("causality_token", Some("tra$h")) + .query_param("timeout", Some("10")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); +} diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs new file mode 100644 index 00000000..bf2b01f8 --- /dev/null +++ b/src/garage/tests/k2v/item.rs @@ -0,0 +1,719 @@ +use crate::common; + +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; +use hyper::Method; + +#[tokio::test] +async fn test_items_and_indices() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-and-index"); + + // ReadIndex -- there should be nothing + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + + let content2_len = "_: hello universe".len(); + let content3_len = "_: concurrent value".len(); + + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + let content = format!("{}: hello world", sk).into_bytes(); + let content2 = format!("{}: hello universe", sk).into_bytes(); + let content3 = format!("{}: concurrent value", sk).into_bytes(); + + // Put initially, no causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .body(content.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*(content2.len() + content3.len()) + content.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again, this time with causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content2.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content2); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*content3.len() + (i+1)*content2.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again with same CT, now we have concurrent values + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content3.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_json = json_body(res).await; + assert_json_eq!( + res_json, + [base64::encode(&content2), base64::encode(&content3)] + ); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i+1, + "values": 2*(i+1), + "bytes": (i+1)*(content2.len() + content3.len()), + } + ], + "more": false, + "nextStart": null + }) + ); + } + + // Now delete things + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + // Get value back (we just need the CT) + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Delete it + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::DELETE) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + if i < 3 { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": 3-i, + "conflicts": 3-i, + "values": 2*(3-i), + "bytes": (3-i)*(content2_len + content3_len), + } + ], + "more": false, + "nextStart": null + }) + ); + } else { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + } + } +} + +#[tokio::test] +async fn test_item_return_format() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-return-format"); + + let single_value = b"A single value".to_vec(); + let concurrent_value = b"A concurrent value".to_vec(); + + // -- Test with a single value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(single_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // -- Test with a second, concurrent value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(concurrent_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // -- Delete first value, concurrently with second insert -- + // -- (we now have a concurrent value and a deletion) -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // -- Delete everything -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); +} diff --git a/src/garage/tests/k2v/mod.rs b/src/garage/tests/k2v/mod.rs new file mode 100644 index 00000000..a009460e --- /dev/null +++ b/src/garage/tests/k2v/mod.rs @@ -0,0 +1,18 @@ +pub mod batch; +pub mod errorcodes; +pub mod item; +pub mod poll; +pub mod simple; + +use hyper::{Body, Response}; + +pub async fn json_body(res: Response<Body>) -> serde_json::Value { + let res_body: serde_json::Value = serde_json::from_slice( + &hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec()[..], + ) + .unwrap(); + res_body +} diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs new file mode 100644 index 00000000..70dc0410 --- /dev/null +++ b/src/garage/tests/k2v/poll.rs @@ -0,0 +1,98 @@ +use hyper::Method; +use std::time::Duration; + +use crate::common; + +#[tokio::test] +async fn test_poll() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), 200); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let res2_body = hyper::body::to_bytes(res2.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res2_body, b"Initial value"); + + // Start poll operation + let poll = { + let bucket = bucket.clone(); + let ct = ct.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .query_param("causality_token", Some(ct)) + .query_param("timeout", Some("10")) + .signed_header("accept", "application/octet-stream") + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), 200); + + let poll_res_body = hyper::body::to_bytes(poll_res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(poll_res_body, b"New value"); +} diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs new file mode 100644 index 00000000..ae9a8674 --- /dev/null +++ b/src/garage/tests/k2v/simple.rs @@ -0,0 +1,40 @@ +use crate::common; + +use hyper::Method; + +#[tokio::test] +async fn test_simple() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-simple"); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), 200); + + let res2_body = hyper::body::to_bytes(res2.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res2_body, b"Hello, world!"); +} diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index 8799c395..0106ad10 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -3,9 +3,5 @@ mod common; mod admin; mod bucket; -mod list; -mod multipart; -mod objects; -mod simple; -mod streaming_signature; -mod website; +mod k2v; +mod s3; diff --git a/src/garage/tests/list.rs b/src/garage/tests/s3/list.rs index bb03f250..bb03f250 100644 --- a/src/garage/tests/list.rs +++ b/src/garage/tests/s3/list.rs diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs new file mode 100644 index 00000000..623eb665 --- /dev/null +++ b/src/garage/tests/s3/mod.rs @@ -0,0 +1,6 @@ +mod list; +mod multipart; +mod objects; +mod simple; +mod streaming_signature; +mod website; diff --git a/src/garage/tests/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..895a2993 100644 --- a/src/garage/tests/multipart.rs +++ b/src/garage/tests/s3/multipart.rs diff --git a/src/garage/tests/objects.rs b/src/garage/tests/s3/objects.rs index e1175b81..e1175b81 100644 --- a/src/garage/tests/objects.rs +++ b/src/garage/tests/s3/objects.rs diff --git a/src/garage/tests/simple.rs b/src/garage/tests/s3/simple.rs index f54ae9ac..f54ae9ac 100644 --- a/src/garage/tests/simple.rs +++ b/src/garage/tests/s3/simple.rs diff --git a/src/garage/tests/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index c68f7dfc..c68f7dfc 100644 --- a/src/garage/tests/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs diff --git a/src/garage/tests/website.rs b/src/garage/tests/s3/website.rs index 963d11ea..0570ac6a 100644 --- a/src/garage/tests/website.rs +++ b/src/garage/tests/s3/website.rs @@ -35,10 +35,7 @@ async fn test_website() { let req = || { Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) .unwrap() @@ -170,10 +167,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .body(Body::empty()) @@ -198,7 +192,7 @@ async fn test_website_s3_api() { .method("GET") .uri(format!( "http://127.0.0.1:{}/wrong.html", - common::garage::DEFAULT_PORT + 2 + ctx.garage.web_port )) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) @@ -217,10 +211,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") @@ -244,10 +235,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "DELETE") @@ -288,10 +276,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") @@ -319,10 +304,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) .unwrap(); diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 007cec89..133fe44e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -22,8 +22,10 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" arc-swap = "1.0" +blake2 = "0.9" err-derive = "0.3" hex = "0.4" +base64 = "0.13" tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } @@ -42,3 +44,6 @@ opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4", path = "../../../netapp" } netapp = "0.4" + +[features] +k2v = [ "garage_util/k2v" ] diff --git a/src/model/garage.rs b/src/model/garage.rs index abdb920a..03e21f8a 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -13,13 +13,19 @@ use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::block_ref_table::*; +use crate::s3::block_ref_table::*; +use crate::s3::object_table::*; +use crate::s3::version_table::*; + use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; use crate::key_table::*; -use crate::object_table::*; -use crate::version_table::*; + +#[cfg(feature = "k2v")] +use crate::index_counter::*; +#[cfg(feature = "k2v")] +use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -35,16 +41,32 @@ pub struct Garage { /// The block manager pub block_manager: Arc<BlockManager>, - /// Table containing informations about buckets + /// Table containing buckets pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, - /// Table containing informations about bucket aliases + /// Table containing bucket aliases pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>, - /// Table containing informations about api keys + /// Table containing api keys pub key_table: Arc<Table<KeyTable, TableFullReplication>>, + /// Table containing S3 objects pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, + /// Table containing S3 object versions pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + /// Table containing S3 block references (not blocks themselves) pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + + #[cfg(feature = "k2v")] + pub k2v: GarageK2V, +} + +#[cfg(feature = "k2v")] +pub struct GarageK2V { + /// Table containing K2V items + pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + /// Indexing table containing K2V item counters + pub counter_table: Arc<IndexCounter<K2VCounterTable>>, + /// K2V RPC handler + pub rpc: Arc<K2VRpcHandler>, } impl Garage { @@ -95,6 +117,21 @@ impl Garage { system.clone(), ); + // ---- admin tables ---- + info!("Initialize bucket_table..."); + let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); + + info!("Initialize bucket_alias_table..."); + let bucket_alias_table = Table::new( + BucketAliasTable, + control_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize key_table_table..."); + let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + + // ---- S3 tables ---- info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { @@ -117,29 +154,20 @@ impl Garage { ); info!("Initialize object_table..."); + #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), }, - meta_rep_param, - system.clone(), - &db, - ); - - info!("Initialize bucket_table..."); - let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); - - info!("Initialize bucket_alias_table..."); - let bucket_alias_table = Table::new( - BucketAliasTable, - control_rep_param.clone(), + meta_rep_param.clone(), system.clone(), &db, ); - info!("Initialize key_table_table..."); - let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + // ---- K2V ---- + #[cfg(feature = "k2v")] + let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); info!("Initialize Garage..."); @@ -155,6 +183,8 @@ impl Garage { object_table, version_table, block_ref_table, + #[cfg(feature = "k2v")] + k2v, }) } @@ -162,3 +192,30 @@ impl Garage { helper::bucket::BucketHelper(self) } } + +#[cfg(feature = "k2v")] +impl GarageK2V { + fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + info!("Initialize K2V counter table..."); + let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); + let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); + let item_table = Table::new( + K2VItemTable { + counter_table: counter_table.clone(), + subscriptions: subscriptions.clone(), + }, + meta_rep_param, + system.clone(), + db, + ); + let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + Self { + item_table, + counter_table, + rpc, + } + } +} diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 706faf26..54d2f97b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,4 +1,4 @@ -use garage_table::util::EmptyKey; +use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; @@ -116,6 +116,7 @@ impl<'a> BucketHelper<'a> { None, Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), 10, + EnumerationOrder::Forward, ) .await? .into_iter() diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs new file mode 100644 index 00000000..123154d4 --- /dev/null +++ b/src/model/index_counter.rs @@ -0,0 +1,305 @@ +use std::collections::{hash_map, BTreeMap, HashMap}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch}; + +use garage_rpc::ring::Ring; +use garage_rpc::system::System; +use garage_util::data::*; +use garage_util::error::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { + const NAME: &'static str; + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CounterSchema> { + pub pk: T::P, + pub sk: T::S, + pub values: BTreeMap<String, CounterValue>, +} + +impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { + fn partition_key(&self) -> &T::P { + &self.pk + } + fn sort_key(&self) -> &T::S { + &self.sk + } + fn is_tombstone(&self) -> bool { + self.values + .iter() + .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) + } +} + +impl<T: CounterSchema> CounterEntry<T> { + pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { + let nodes = &ring.layout.node_id_vec[..]; + self.filtered_values_with_nodes(nodes) + } + + pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { + let mut ret = HashMap::new(); + for (name, vals) in self.values.iter() { + let new_vals = vals + .node_values + .iter() + .filter(|(n, _)| nodes.contains(n)) + .map(|(_, (_, v))| *v) + .collect::<Vec<_>>(); + if !new_vals.is_empty() { + ret.insert( + name.clone(), + new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), + ); + } + } + + ret + } +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterValue { + pub node_values: BTreeMap<Uuid, (u64, i64)>, +} + +impl<T: CounterSchema> Crdt for CounterEntry<T> { + fn merge(&mut self, other: &Self) { + for (name, e2) in other.values.iter() { + if let Some(e) = self.values.get_mut(name) { + e.merge(e2); + } else { + self.values.insert(name.clone(), e2.clone()); + } + } + } +} + +impl Crdt for CounterValue { + fn merge(&mut self, other: &Self) { + for (node, (t2, e2)) in other.node_values.iter() { + if let Some((t, e)) = self.node_values.get_mut(node) { + if t2 > t { + *e = *e2; + } + } else { + self.node_values.insert(*node, (*t2, *e2)); + } + } + } +} + +pub struct CounterTable<T: CounterSchema> { + _phantom_t: PhantomData<T>, +} + +impl<T: CounterSchema> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::NAME; + + type P = T::P; + type S = T::S; + type E = CounterEntry<T>; + type Filter = (DeletedFilter, Vec<Uuid>); + + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { + // nothing for now + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + if filter.0 == DeletedFilter::Any { + return true; + } + + let is_tombstone = entry + .filtered_values_with_nodes(&filter.1[..]) + .iter() + .all(|(_, v)| *v == 0); + filter.0.apply(is_tombstone) + } +} + +// ---- + +pub struct IndexCounter<T: CounterSchema> { + this_node: Uuid, + local_counter: sled::Tree, + propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, +} + +impl<T: CounterSchema> IndexCounter<T> { + pub fn new( + system: Arc<System>, + replication: TableShardedReplication, + db: &sled::Db, + ) -> Arc<Self> { + let background = system.background.clone(); + + let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); + + let this = Arc::new(Self { + this_node: system.id, + local_counter: db + .open_tree(format!("local_counter:{}", T::NAME)) + .expect("Unable to open local counter tree"), + propagate_tx, + table: Table::new( + CounterTable { + _phantom_t: Default::default(), + }, + replication, + system, + db, + ), + }); + + let this2 = this.clone(); + background.spawn_worker( + format!("{} index counter propagator", T::NAME), + move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), + ); + this + } + + pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + let tree_key = self.table.data.tree_key(pk, sk); + + let new_entry = self.local_counter.transaction(|tx| { + let mut entry = match tx.get(&tree_key[..])? { + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)? + } + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } + + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + tx.insert(&tree_key[..], new_entry_bytes)?; + + Ok(entry) + })?; + + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + error!( + "Could not propagate updated counter values, failed to send to channel: {}", + e + ); + } + + Ok(()) + } + + async fn propagate_loop( + self: Arc<Self>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + must_exit: watch::Receiver<bool>, + ) { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let mut buf = HashMap::new(); + let mut errors = 0; + + loop { + let (ent, closed) = match propagate_rx.try_recv() { + Ok(ent) => (Some(ent), false), + Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { + match propagate_rx.recv().await { + Some(ent) => (Some(ent), false), + None => (None, true), + } + } + Err(mpsc::error::TryRecvError::Empty) => (None, false), + Err(mpsc::error::TryRecvError::Disconnected) => (None, true), + }; + + if let Some((pk, sk, counters)) = ent { + let tree_key = self.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk); + match buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + // As long as we can add entries, loop back and add them to batch + // before sending batch to other nodes + continue; + } + + if !buf.is_empty() { + let entries = buf.iter().map(|(_k, v)| v); + if let Err(e) = self.table.insert_many(entries).await { + errors += 1; + if errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + break; + } + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + buf.clear(); + errors = 0; + } + + if closed || *must_exit.borrow() { + break; + } + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct LocalCounterEntry { + values: BTreeMap<String, (u64, i64)>, +} + +impl LocalCounterEntry { + fn into_counter_entry<T: CounterSchema>( + self, + this_node: Uuid, + pk: T::P, + sk: T::S, + ) -> CounterEntry<T> { + CounterEntry { + pk, + sk, + values: self + .values + .into_iter() + .map(|(name, (ts, v))| { + let mut node_values = BTreeMap::new(); + node_values.insert(this_node, (ts, v)); + (name, CounterValue { node_values }) + }) + .collect(), + } + } +} diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs new file mode 100644 index 00000000..8c76a32b --- /dev/null +++ b/src/model/k2v/causality.rs @@ -0,0 +1,96 @@ +use std::collections::BTreeMap; +use std::convert::TryInto; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +/// Node IDs used in K2V are u64 integers that are the abbreviation +/// of full Garage node IDs which are 256-bit UUIDs. +pub type K2VNodeId = u64; + +pub fn make_node_id(node_id: Uuid) -> K2VNodeId { + let mut tmp = [0u8; 8]; + tmp.copy_from_slice(&node_id.as_slice()[..8]); + u64::from_be_bytes(tmp) +} + +#[derive(PartialEq, Debug, Serialize, Deserialize)] +pub struct CausalContext { + pub vector_clock: BTreeMap<K2VNodeId, u64>, +} + +impl CausalContext { + /// Empty causality context + pub fn new_empty() -> Self { + Self { + vector_clock: BTreeMap::new(), + } + } + /// Make binary representation and encode in base64 + pub fn serialize(&self) -> String { + let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); + for (node, time) in self.vector_clock.iter() { + ints.push(*node); + ints.push(*time); + } + let checksum = ints.iter().fold(0, |acc, v| acc ^ *v); + + let mut bytes = u64::to_be_bytes(checksum).to_vec(); + for i in ints { + bytes.extend(u64::to_be_bytes(i)); + } + + base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) + } + /// Parse from base64-encoded binary representation + pub fn parse(s: &str) -> Result<Self, String> { + let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) + .map_err(|e| format!("bad causality token base64: {}", e))?; + if bytes.len() % 16 != 8 || bytes.len() < 8 { + return Err("bad causality token length".into()); + } + + let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); + let mut ret = CausalContext { + vector_clock: BTreeMap::new(), + }; + + for i in 0..(bytes.len() / 16) { + let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap()); + let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap()); + ret.vector_clock.insert(node_id, time); + } + + let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); + + if check != checksum { + return Err("bad causality token checksum".into()); + } + + Ok(ret) + } + /// Check if this causal context contains newer items than another one + pub fn is_newer_than(&self, other: &Self) -> bool { + self.vector_clock + .iter() + .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_causality_token_serialization() { + let ct = CausalContext { + vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)] + .iter() + .cloned() + .collect(), + }; + + assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct); + } +} diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs new file mode 100644 index 00000000..4856eb2b --- /dev/null +++ b/src/model/k2v/counter_table.rs @@ -0,0 +1,20 @@ +use garage_util::data::*; + +use crate::index_counter::*; + +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + +#[derive(PartialEq, Clone)] +pub struct K2VCounterTable; + +impl CounterSchema for K2VCounterTable { + const NAME: &'static str = "k2v_index_counter"; + + // Partition key = bucket id + type P = Uuid; + // Sort key = K2V item's partition key + type S = String; +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs new file mode 100644 index 00000000..8b7cc08a --- /dev/null +++ b/src/model/k2v/item_table.rs @@ -0,0 +1,291 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::index_counter::*; +use crate::k2v::causality::*; +use crate::k2v::counter_table::*; +use crate::k2v::poll::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, + + items: BTreeMap<K2VNodeId, DvvsEntry>, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)] +pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct DvvsEntry { + t_discard: u64, + values: Vec<(u64, DvvsValue)>, +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec<u8>), + Deleted, +} + +impl K2VItem { + /// Creates a new K2VItem when no previous entry existed in the db + pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { + Self { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + items: BTreeMap::new(), + } + } + /// Updates a K2VItem with a new value or a deletion event + pub fn update( + &mut self, + this_node: Uuid, + context: &Option<CausalContext>, + new_value: DvvsValue, + ) { + if let Some(context) = context { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } + } + } + + self.discard(); + + let node_id = make_node_id(this_node); + let e = self.items.entry(node_id).or_insert(DvvsEntry { + t_discard: 0, + values: vec![], + }); + let t_prev = e.max_time(); + e.values.push((t_prev + 1, new_value)); + } + + /// Extract the causality context of a K2V Item + pub fn causal_context(&self) -> CausalContext { + let mut cc = CausalContext::new_empty(); + for (node, ent) in self.items.iter() { + cc.vector_clock.insert(*node, ent.max_time()); + } + cc + } + + /// Extract the list of values + pub fn values(&'_ self) -> Vec<&'_ DvvsValue> { + let mut ret = vec![]; + for (_, ent) in self.items.iter() { + for (_, v) in ent.values.iter() { + if !ret.contains(&v) { + ret.push(v); + } + } + } + ret + } + + fn discard(&mut self) { + for (_, ent) in self.items.iter_mut() { + ent.discard(); + } + } + + // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used) + fn stats(&self) -> (i64, i64, i64, i64) { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + (n_entries, n_conflicts, n_values, n_bytes) + } +} + +impl DvvsEntry { + fn max_time(&self) -> u64 { + self.values + .iter() + .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts)) + } + + fn discard(&mut self) { + self.values = std::mem::take(&mut self.values) + .into_iter() + .filter(|(t, _)| *t > self.t_discard) + .collect::<Vec<_>>(); + } +} + +impl Crdt for K2VItem { + fn merge(&mut self, other: &Self) { + for (node, e2) in other.items.iter() { + if let Some(e) = self.items.get_mut(node) { + e.merge(e2); + } else { + self.items.insert(*node, e2.clone()); + } + } + } +} + +impl Crdt for DvvsEntry { + fn merge(&mut self, other: &Self) { + self.t_discard = std::cmp::max(self.t_discard, other.t_discard); + self.discard(); + + let t_max = self.max_time(); + for (vt, vv) in other.values.iter() { + if *vt > t_max { + self.values.push((*vt, vv.clone())); + } + } + } +} + +impl PartitionKey for K2VItemPartition { + fn hash(&self) -> Hash { + use blake2::{Blake2b, Digest}; + + let mut hasher = Blake2b::new(); + hasher.update(self.bucket_id.as_slice()); + hasher.update(self.partition_key.as_bytes()); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()[..32]); + hash.into() + } +} + +impl Entry<K2VItemPartition, String> for K2VItem { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &String { + &self.sort_key + } + fn is_tombstone(&self) -> bool { + self.values() + .iter() + .all(|v| matches!(v, DvvsValue::Deleted)) + } +} + +pub struct K2VItemTable { + pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>, + pub(crate) subscriptions: Arc<SubscriptionManager>, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ItemFilter { + pub exclude_only_tombstones: bool, + pub conflicts_only: bool, +} + +impl TableSchema for K2VItemTable { + const TABLE_NAME: &'static str = "k2v_item"; + + type P = K2VItemPartition; + type S = String; + type E = K2VItem; + type Filter = ItemFilter; + + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + // 1. Count + let (old_entries, old_conflicts, old_values, old_bytes) = match old { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + let (new_entries, new_conflicts, new_values, new_bytes) = match new { + None => (0, 0, 0, 0), + Some(e) => e.stats(), + }; + + let count_pk = old + .map(|e| e.partition.bucket_id) + .unwrap_or_else(|| new.unwrap().partition.bucket_id); + let count_sk = old + .map(|e| &e.partition.partition_key) + .unwrap_or_else(|| &new.unwrap().partition.partition_key); + + if let Err(e) = self.counter_table.count( + &count_pk, + count_sk, + &[ + (ENTRIES, new_entries - old_entries), + (CONFLICTS, new_conflicts - old_conflicts), + (VALUES, new_values - old_values), + (BYTES, new_bytes - old_bytes), + ], + ) { + error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + } + + // 2. Notify + if let Some(new_ent) = new { + self.subscriptions.notify(new_ent); + } + } + + #[allow(clippy::nonminimal_bool)] + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + let v = entry.values(); + !(filter.conflicts_only && v.len() < 2) + && !(filter.exclude_only_tombstones && entry.is_tombstone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dvvsentry_merge_simple() { + let e1 = DvvsEntry { + t_discard: 4, + values: vec![ + (5, DvvsValue::Value(vec![15])), + (6, DvvsValue::Value(vec![16])), + ], + }; + let e2 = DvvsEntry { + t_discard: 5, + values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], + }; + + let mut e3 = e1.clone(); + e3.merge(&e2); + assert_eq!(e2, e3); + } +} diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs new file mode 100644 index 00000000..664172a6 --- /dev/null +++ b/src/model/k2v/mod.rs @@ -0,0 +1,7 @@ +pub mod causality; + +pub mod counter_table; +pub mod item_table; + +pub mod poll; +pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs new file mode 100644 index 00000000..93105207 --- /dev/null +++ b/src/model/k2v/poll.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Default)] +pub struct SubscriptionManager { + subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } +} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs new file mode 100644 index 00000000..90101d0f --- /dev/null +++ b/src/model/k2v/rpc.rs @@ -0,0 +1,343 @@ +//! Module that implements RPCs specific to K2V. +//! This is necessary for insertions into the K2V store, +//! as they have to be transmitted to one of the nodes responsible +//! for storing the entry to be processed (the API entry +//! node does not process the entry directly, as this would +//! mean the vector clock gets much larger than needed). + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::select; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::system::System; +use garage_rpc::*; + +use garage_table::replication::{TableReplication, TableShardedReplication}; +use garage_table::table::TABLE_RPC_TIMEOUT; +use garage_table::{PartitionKey, Table}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; +use crate::k2v::poll::*; + +/// RPC messages for K2V +#[derive(Debug, Serialize, Deserialize)] +enum K2VRpc { + Ok, + InsertItem(InsertedItem), + InsertManyItems(Vec<InsertedItem>), + PollItem { + key: PollKey, + causal_context: CausalContext, + timeout_msec: u64, + }, + PollItemResponse(Option<K2VItem>), +} + +#[derive(Debug, Serialize, Deserialize)] +struct InsertedItem { + partition: K2VItemPartition, + sort_key: String, + causal_context: Option<CausalContext>, + value: DvvsValue, +} + +impl Rpc for K2VRpc { + type Response = Result<K2VRpc, Error>; +} + +/// The block manager, handling block exchange between nodes, and block storage on local node +pub struct K2VRpcHandler { + system: Arc<System>, + item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, + subscriptions: Arc<SubscriptionManager>, +} + +impl K2VRpcHandler { + pub fn new( + system: Arc<System>, + item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + subscriptions: Arc<SubscriptionManager>, + ) -> Arc<Self> { + let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); + + let rpc_handler = Arc::new(Self { + system, + item_table, + endpoint, + subscriptions, + }); + rpc_handler.endpoint.set_handler(rpc_handler.clone()); + + rpc_handler + } + + // ---- public interface ---- + + pub async fn insert( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: Option<CausalContext>, + value: DvvsValue, + ) -> Result<(), Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + self.system + .rpc + .try_call_many( + &self.endpoint, + &who[..], + K2VRpc::InsertItem(InsertedItem { + partition, + sort_key, + causal_context, + value, + }), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + Ok(()) + } + + pub async fn insert_batch( + &self, + bucket_id: Uuid, + items: Vec<(String, String, Option<CausalContext>, DvvsValue)>, + ) -> Result<(), Error> { + let n_items = items.len(); + + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); + + for (partition_key, sort_key, causal_context, value) in items { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + call_list.entry(who).or_default().push(InsertedItem { + partition, + sort_key, + causal_context, + value, + }); + } + + debug!( + "K2V insert_batch: {} requests to insert {} items", + call_list.len(), + n_items + ); + let call_futures = call_list.into_iter().map(|(nodes, items)| async move { + let resp = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::InsertManyItems(items), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + Ok::<_, Error>((nodes, resp)) + }); + + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + while let Some(resp) = resps.next().await { + resp?; + } + + Ok(()) + } + + pub async fn poll( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: CausalContext, + timeout_msec: u64, + ) -> Result<Option<K2VItem>, Error> { + let poll_key = PollKey { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + }; + let nodes = self + .item_table + .data + .replication + .write_nodes(&poll_key.partition.hash()); + + let resps = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT), + ) + .await?; + + let mut resp: Option<K2VItem> = None; + for v in resps { + match v { + K2VRpc::PollItemResponse(Some(x)) => { + if let Some(y) = &mut resp { + y.merge(&x); + } else { + resp = Some(x); + } + } + K2VRpc::PollItemResponse(None) => { + return Ok(None); + } + v => return Err(Error::unexpected_rpc_message(v)), + } + } + + Ok(resp) + } + + // ---- internal handlers ---- + + async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { + let new = self.local_insert(item)?; + + // Propagate to rest of network + if let Some(updated) = new { + self.item_table.insert(&updated).await?; + } + + Ok(K2VRpc::Ok) + } + + async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { + let mut updated_vec = vec![]; + + for item in items { + let new = self.local_insert(item)?; + + if let Some(updated) = new { + updated_vec.push(updated); + } + } + + // Propagate to rest of network + if !updated_vec.is_empty() { + self.item_table.insert_many(&updated_vec).await?; + } + + Ok(K2VRpc::Ok) + } + + fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + let tree_key = self + .item_table + .data + .tree_key(&item.partition, &item.sort_key); + + self.item_table + .data + .update_entry_with(&tree_key[..], |ent| { + let mut ent = ent.unwrap_or_else(|| { + K2VItem::new( + item.partition.bucket_id, + item.partition.partition_key.clone(), + item.sort_key.clone(), + ) + }); + ent.update(self.system.id, &item.causal_context, item.value.clone()); + ent + }) + } + + async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe(key); + + let mut value = self + .item_table + .data + .read_entry(&key.partition, &key.sort_key)? + .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) + .transpose()? + .unwrap_or_else(|| { + K2VItem::new( + key.partition.bucket_id, + key.partition.partition_key.clone(), + key.sort_key.clone(), + ) + }); + + while !value.causal_context().is_newer_than(ct) { + value = chan.recv().await?; + } + + Ok(value) + } +} + +#[async_trait] +impl EndpointHandler<K2VRpc> for K2VRpcHandler { + async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> { + match message { + K2VRpc::InsertItem(item) => self.handle_insert(item).await, + K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::PollItem { + key, + causal_context, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + _ = delay => Ok(K2VRpc::PollItemResponse(None)), + } + } + m => Err(Error::unexpected_rpc_message(m)), + } + } +} diff --git a/src/model/lib.rs b/src/model/lib.rs index 05a4cdc7..7c9d9270 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -3,12 +3,15 @@ extern crate tracing; pub mod permission; -pub mod block_ref_table; +pub mod index_counter; + pub mod bucket_alias_table; pub mod bucket_table; pub mod key_table; -pub mod object_table; -pub mod version_table; + +#[cfg(feature = "k2v")] +pub mod k2v; +pub mod s3; pub mod garage; pub mod helper; diff --git a/src/model/block_ref_table.rs b/src/model/s3/block_ref_table.rs index b6945403..9b3991bf 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -51,11 +51,11 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { #[allow(clippy::or_fun_call)] - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + let block = &old.or(new).unwrap().block; + let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { if let Err(e) = self.block_manager.block_incref(block) { warn!("block_incref failed for block {:?}: {}", block, e); diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs new file mode 100644 index 00000000..4e94337d --- /dev/null +++ b/src/model/s3/mod.rs @@ -0,0 +1,3 @@ +pub mod block_ref_table; +pub mod object_table; +pub mod version_table; diff --git a/src/model/object_table.rs b/src/model/s3/object_table.rs index da53878e..3d9a89f7 100644 --- a/src/model/object_table.rs +++ b/src/model/s3/object_table.rs @@ -9,7 +9,7 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::version_table::*; +use crate::s3::version_table::*; use garage_model_050::object_table as old; @@ -232,8 +232,11 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { let version_table = self.version_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions diff --git a/src/model/version_table.rs b/src/model/s3/version_table.rs index 839b1f4f..ad096772 100644 --- a/src/model/version_table.rs +++ b/src/model/s3/version_table.rs @@ -8,7 +8,7 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::block_ref_table::*; +use crate::s3::block_ref_table::*; use garage_model_050::version_table as old; @@ -137,8 +137,11 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { let block_ref_table = self.block_ref_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 46d0dc1e..bed7f44a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -52,5 +52,6 @@ netapp = { version = "0.4.4", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } + [features] kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ] diff --git a/src/table/data.rs b/src/table/data.rs index ff7965f5..5cb10066 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,8 +1,9 @@ use core::borrow::Borrow; +use std::convert::TryInto; use std::sync::Arc; use serde_bytes::ByteBuf; -use sled::Transactional; +use sled::{IVec, Transactional}; use tokio::sync::Notify; use garage_util::data::*; @@ -16,12 +17,13 @@ use crate::gc::GcTodoEntry; use crate::metrics::*; use crate::replication::*; use crate::schema::*; +use crate::util::*; pub struct TableData<F: TableSchema, R: TableReplication> { system: Arc<System>, - pub(crate) instance: F, - pub(crate) replication: R, + pub instance: F, + pub replication: R, pub store: sled::Tree, @@ -83,18 +85,48 @@ where pub fn read_range( &self, - p: &F::P, - s: &Option<F::S>, + partition_key: &F::P, + start: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = partition_key.hash(); + match enumeration_order { + EnumerationOrder::Forward => { + let first_key = match start { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(partition_key, sk), + }; + let range = self.store.range(first_key..); + self.read_range_aux(partition_hash, range, filter, limit) + } + EnumerationOrder::Reverse => match start { + Some(sk) => { + let last_key = self.tree_key(partition_key, sk); + let range = self.store.range(..=last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + None => { + let mut last_key = partition_hash.to_vec(); + let lower = u128::from_be_bytes(last_key[16..32].try_into().unwrap()); + last_key[16..32].copy_from_slice(&u128::to_be_bytes(lower + 1)); + let range = self.store.range(..last_key).rev(); + self.read_range_aux(partition_hash, range, filter, limit) + } + }, + } + } + + fn read_range_aux( + &self, + partition_hash: Hash, + range: impl Iterator<Item = sled::Result<(IVec, IVec)>>, filter: &Option<F::Filter>, limit: usize, ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; let mut ret = vec![]; - for item in self.store.range(first_key..) { + for item in range { let (key, value) = item?; if &key[..32] != partition_hash.as_slice() { break; @@ -136,17 +168,31 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + self.update_entry_with(&tree_key[..], |ent| match ent { + Some(mut ent) => { + ent.merge(&update); + ent + } + None => update.clone(), + })?; + Ok(()) + } + + pub fn update_entry_with( + &self, + tree_key: &[u8], + f: impl Fn(Option<F::E>) -> F::E, + ) -> Result<Option<F::E>, Error> { let changed = (&self.store, &self.merkle_todo).transaction(|(store, mkl_todo)| { - let (old_entry, old_bytes, new_entry) = match store.get(&tree_key)? { + let (old_entry, old_bytes, new_entry) = match store.get(tree_key)? { Some(old_bytes) => { let old_entry = self .decode_entry(&old_bytes) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); + let new_entry = f(Some(old_entry.clone())); (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, update.clone()), + None => (None, None, f(None)), }; // Scenario 1: the value changed, so of course there is a change @@ -163,8 +209,8 @@ where if value_changed || encoding_changed { let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; + mkl_todo.insert(tree_key.to_vec(), new_bytes_hash.as_slice())?; + store.insert(tree_key.to_vec(), new_bytes)?; Ok(Some((old_entry, new_entry, new_bytes_hash))) } else { Ok(None) @@ -175,7 +221,7 @@ where self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); - self.instance.updated(old_entry, Some(new_entry)); + self.instance.updated(old_entry.as_ref(), Some(&new_entry)); self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -187,12 +233,14 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; } } - } - Ok(()) + Ok(Some(new_entry)) + } else { + Ok(None) + } } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { @@ -211,7 +259,7 @@ where self.metrics.internal_delete_counter.add(1); let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); } Ok(removed) @@ -235,7 +283,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; - self.instance.updated(Some(old_entry), None); + self.instance.updated(Some(&old_entry), None); self.merkle_todo_notify.notify_one(); Ok(true) } else { @@ -245,13 +293,13 @@ where // ---- Utility functions ---- - pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { let mut ret = p.hash().to_vec(); ret.extend(s.sort_key()); ret } - pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { diff --git a/src/table/schema.rs b/src/table/schema.rs index eba918a2..37327037 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -86,7 +86,7 @@ pub trait TableSchema: Send + Sync { // as the update itself is an unchangeable fact that will never go back // due to CRDT logic. Typically errors in propagation of info should be logged // to stderr. - fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {} fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/table/table.rs b/src/table/table.rs index 7f87a449..2a167604 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -1,4 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use std::time::Duration; @@ -26,8 +27,9 @@ use crate::merkle::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; +use crate::util::*; -const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub system: Arc<System>, @@ -45,7 +47,13 @@ pub(crate) enum TableRpc<F: TableSchema> { ReadEntryResponse(Option<ByteBuf>), // Read range: read all keys in partition P, possibly starting at a certain sort key offset - ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + ReadRange { + partition: F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + enumeration_order: EnumerationOrder, + }, Update(Vec<Arc<ByteBuf>>), } @@ -123,9 +131,13 @@ where Ok(()) } - pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let tracer = opentelemetry::global::tracer("garage_table"); - let span = tracer.start(format!("{} insert_many {}", F::TABLE_NAME, entries.len())); + let span = tracer.start(format!("{} insert_many", F::TABLE_NAME)); self.insert_many_internal(entries) .bound_record_duration(&self.data.metrics.put_request_duration) @@ -137,10 +149,15 @@ where Ok(()) } - async fn insert_many_internal(&self, entries: &[F::E]) -> Result<(), Error> { + async fn insert_many_internal<I, IE>(&self, entries: I) -> Result<(), Error> + where + I: IntoIterator<Item = IE> + Send + Sync, + IE: Borrow<F::E> + Send + Sync, + { let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); - for entry in entries.iter() { + for entry in entries.into_iter() { + let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); @@ -261,12 +278,19 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} get_range", F::TABLE_NAME)); let res = self - .get_range_internal(partition_key, begin_sort_key, filter, limit) + .get_range_internal( + partition_key, + begin_sort_key, + filter, + limit, + enumeration_order, + ) .bound_record_duration(&self.data.metrics.get_request_duration) .with_context(Context::current_with_span(span)) .await?; @@ -282,11 +306,18 @@ where begin_sort_key: Option<F::S>, filter: Option<F::Filter>, limit: usize, + enumeration_order: EnumerationOrder, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); let who = self.data.replication.read_nodes(&hash); - let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + let rpc = TableRpc::<F>::ReadRange { + partition: partition_key.clone(), + begin_sort_key, + filter, + limit, + enumeration_order, + }; let resps = self .system @@ -302,44 +333,65 @@ where ) .await?; - let mut ret = BTreeMap::new(); - let mut to_repair = BTreeMap::new(); + let mut ret: BTreeMap<Vec<u8>, F::E> = BTreeMap::new(); + let mut to_repair = BTreeSet::new(); for resp in resps { if let TableRpc::Update(entries) = resp { for entry_bytes in entries.iter() { let entry = self.data.decode_entry(entry_bytes.as_slice())?; let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); - match ret.remove(&entry_key) { - None => { - ret.insert(entry_key, Some(entry)); - } - Some(Some(mut prev)) => { - let must_repair = prev != entry; - prev.merge(&entry); - if must_repair { - to_repair.insert(entry_key.clone(), Some(prev.clone())); + match ret.get_mut(&entry_key) { + Some(e) => { + if *e != entry { + e.merge(&entry); + to_repair.insert(entry_key.clone()); } - ret.insert(entry_key, Some(prev)); } - Some(None) => unreachable!(), + None => { + ret.insert(entry_key, entry); + } } } + } else { + return Err(Error::unexpected_rpc_message(resp)); } } + if !to_repair.is_empty() { let self2 = self.clone(); + let to_repair = to_repair + .into_iter() + .map(|k| ret.get(&k).unwrap().clone()) + .collect::<Vec<_>>(); self.system.background.spawn_cancellable(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; + for v in to_repair { + self2.repair_on_read(&who[..], v).await?; } Ok(()) }); } - let ret_vec = ret - .iter_mut() - .take(limit) - .map(|(_k, v)| v.take().unwrap()) - .collect::<Vec<_>>(); + + // At this point, the `ret` btreemap might contain more than `limit` + // items, because nodes might have returned us each `limit` items + // but for different keys. We have to take only the first `limit` items + // in this map, in the specified enumeration order, for two reasons: + // 1. To return to the user no more than the number of items that they requested + // 2. To return only items for which we have a read quorum: we do not know + // that we have a read quorum for the items after the first `limit` + // of them + let ret_vec = match enumeration_order { + EnumerationOrder::Forward => ret + .into_iter() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + EnumerationOrder::Reverse => ret + .into_iter() + .rev() + .take(limit) + .map(|(_k, v)| v) + .collect::<Vec<_>>(), + }; Ok(ret_vec) } @@ -378,8 +430,20 @@ where let value = self.data.read_entry(key, sort_key)?; Ok(TableRpc::ReadEntryResponse(value)) } - TableRpc::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; + TableRpc::ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => { + let values = self.data.read_range( + partition, + begin_sort_key, + filter, + *limit, + *enumeration_order, + )?; Ok(TableRpc::Update(values)) } TableRpc::Update(pairs) => { diff --git a/src/table/util.rs b/src/table/util.rs index 2a5c3afe..20595a94 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -17,7 +17,7 @@ impl PartitionKey for EmptyKey { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum DeletedFilter { Any, Deleted, @@ -33,3 +33,19 @@ impl DeletedFilter { } } } + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum EnumerationOrder { + Forward, + Reverse, +} + +impl EnumerationOrder { + pub fn from_reverse(reverse: bool) -> Self { + if reverse { + Self::Reverse + } else { + Self::Forward + } + } +} diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index f13c1589..95cde531 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -41,3 +41,6 @@ http = "0.2" hyper = "0.14" opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } + +[features] +k2v = [] diff --git a/src/util/config.rs b/src/util/config.rs index e4d96476..4d66bfe4 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -73,7 +73,11 @@ pub struct Config { pub sled_flush_every_ms: u64, /// Configuration for S3 api - pub s3_api: ApiConfig, + pub s3_api: S3ApiConfig, + + /// Configuration for K2V api + #[cfg(feature = "k2v")] + pub k2v_api: Option<K2VApiConfig>, /// Configuration for serving files as normal web server pub s3_web: WebConfig, @@ -85,7 +89,7 @@ pub struct Config { /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] -pub struct ApiConfig { +pub struct S3ApiConfig { /// Address and port to bind for api serving pub api_bind_addr: SocketAddr, /// S3 region to use @@ -95,6 +99,14 @@ pub struct ApiConfig { pub root_domain: Option<String>, } +/// Configuration for K2V api +#[cfg(feature = "k2v")] +#[derive(Deserialize, Debug, Clone)] +pub struct K2VApiConfig { + /// Address and port to bind for api serving + pub api_bind_addr: SocketAddr, +} + /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { diff --git a/src/util/error.rs b/src/util/error.rs index bdb3a69b..8734a0c8 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -44,6 +44,9 @@ pub enum Error { #[error(display = "Tokio semaphore acquire error: {}", _0)] TokioSemAcquire(#[error(source)] tokio::sync::AcquireError), + #[error(display = "Tokio broadcast receive error: {}", _0)] + TokioBcastRecv(#[error(source)] tokio::sync::broadcast::error::RecvError), + #[error(display = "Remote error: {}", _0)] RemoteError(String), diff --git a/src/web/web_server.rs b/src/web/web_server.rs index c3d691d0..867adc51 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -20,8 +20,8 @@ use crate::error::*; use garage_api::error::{Error as ApiError, OkOrBadRequest, OkOrInternalError}; use garage_api::helpers::{authority_to_host, host_to_bucket}; -use garage_api::s3_cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; -use garage_api::s3_get::{handle_get, handle_head}; +use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; +use garage_api::s3::get::{handle_get, handle_head}; use garage_model::garage::Garage; |