aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v')
-rw-r--r--src/api/k2v/api_server.rs190
-rw-r--r--src/api/k2v/batch.rs363
-rw-r--r--src/api/k2v/error.rs135
-rw-r--r--src/api/k2v/index.rs100
-rw-r--r--src/api/k2v/item.rs230
-rw-r--r--src/api/k2v/mod.rs9
-rw-r--r--src/api/k2v/range.rs100
-rw-r--r--src/api/k2v/router.rs252
8 files changed, 1379 insertions, 0 deletions
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
new file mode 100644
index 00000000..084867b5
--- /dev/null
+++ b/src/api/k2v/api_server.rs
@@ -0,0 +1,190 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+
+use crate::generic_server::*;
+use crate::k2v::error::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::k2v::batch::*;
+use crate::k2v::index::*;
+use crate::k2v::item::*;
+use crate::k2v::router::Endpoint;
+use crate::s3::cors::*;
+
+pub struct K2VApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct K2VApiEndpoint {
+ bucket_name: String,
+ endpoint: Endpoint,
+}
+
+impl K2VApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ bind_addr: SocketAddr,
+ s3_region: String,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ ApiServer::new(s3_region, K2VApiServer { garage })
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ }
+}
+
+#[async_trait]
+impl ApiHandler for K2VApiServer {
+ const API_NAME: &'static str = "k2v";
+ const API_NAME_DISPLAY: &'static str = "K2V";
+
+ type Endpoint = K2VApiEndpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ let (endpoint, bucket_name) = Endpoint::from_request(req)?;
+
+ Ok(K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: K2VApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // The OPTIONS method is procesed early, before we even check for an API key
+ if let Endpoint::Options = endpoint {
+ return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ .await
+ .ok_or_bad_request("Error handling OPTIONS")?);
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "k2v",
+ )?;
+
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::forbidden("Operation is not allowed for this key."));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::DeleteItem {
+ partition_key,
+ sort_key,
+ } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::InsertItem {
+ partition_key,
+ sort_key,
+ } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::ReadItem {
+ partition_key,
+ sort_key,
+ } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::PollItem {
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ } => {
+ handle_poll_item(
+ garage,
+ &req,
+ bucket_id,
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ )
+ .await
+ }
+ Endpoint::ReadIndex {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::Options => unreachable!(),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for K2VApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
+ }
+}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
new file mode 100644
index 00000000..db9901cf
--- /dev/null
+++ b/src/api/k2v/batch.rs
@@ -0,0 +1,363 @@
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::{EnumerationOrder, TableSchema};
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::helpers::*;
+use crate::k2v::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_insert_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
+
+ let mut items2 = vec![];
+ for it in items {
+ let ct = it
+ .ct
+ .map(|s| CausalContext::parse(&s))
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+ let v = match it.v {
+ Some(vs) => {
+ DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
+ }
+ None => DvvsValue::Deleted,
+ };
+ items2.push((it.pk, it.sk, ct, v));
+ }
+
+ garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_read_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<ReadBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_read_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: ReadBatchQuery,
+) -> Result<ReadBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: !query.tombstones,
+ conflicts_only: query.conflicts_only,
+ };
+
+ let (items, more, next_start) = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
+ return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true."));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None),
+ None => (vec![], false, None),
+ }
+ } else {
+ let (items, more, next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ query.limit,
+ Some(filter),
+ EnumerationOrder::from_reverse(query.reverse),
+ )
+ .await?;
+
+ let items = items
+ .into_iter()
+ .map(ReadBatchResponseItem::from)
+ .collect::<Vec<_>>();
+
+ (items, more, next_start)
+ };
+
+ Ok(ReadBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ limit: query.limit,
+ reverse: query.reverse,
+ single_item: query.single_item,
+ conflicts_only: query.conflicts_only,
+ tombstones: query.tombstones,
+ items,
+ more,
+ next_start,
+ })
+}
+
+pub async fn handle_delete_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<DeleteBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_delete_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: DeleteBatchQuery,
+) -> Result<DeleteBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: true,
+ conflicts_only: false,
+ };
+
+ let deleted_items = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() {
+ return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true."));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => {
+ let cc = i.causal_context();
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ .await?;
+ 1
+ }
+ None => 0,
+ }
+ } else {
+ let (items, more, _next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ None,
+ Some(filter),
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ assert!(!more);
+
+ // TODO delete items
+ let items = items
+ .into_iter()
+ .map(|i| {
+ let cc = i.causal_context();
+ (
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ })
+ .collect::<Vec<_>>();
+ let n = items.len();
+
+ garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+
+ n
+ };
+
+ Ok(DeleteBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ single_item: query.single_item,
+ deleted_items,
+ })
+}
+
+#[derive(Deserialize)]
+struct InsertBatchItem {
+ pk: String,
+ sk: String,
+ ct: Option<String>,
+ v: Option<String>,
+}
+
+#[derive(Deserialize)]
+struct ReadBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ limit: Option<u64>,
+ #[serde(default)]
+ reverse: bool,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+ #[serde(default, rename = "conflictsOnly")]
+ conflicts_only: bool,
+ #[serde(default)]
+ tombstones: bool,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+ #[serde(rename = "conflictsOnly")]
+ conflicts_only: bool,
+ tombstones: bool,
+
+ items: Vec<ReadBatchResponseItem>,
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponseItem {
+ sk: String,
+ ct: String,
+ v: Vec<Option<String>>,
+}
+
+impl ReadBatchResponseItem {
+ fn from(i: K2VItem) -> Self {
+ let ct = i.causal_context().serialize();
+ let v = i
+ .values()
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Deleted => None,
+ })
+ .collect::<Vec<_>>();
+ Self {
+ sk: i.sort_key,
+ ct,
+ v,
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct DeleteBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+}
+
+#[derive(Serialize)]
+struct DeleteBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+
+ #[serde(rename = "deletedItems")]
+ deleted_items: usize,
+}
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
new file mode 100644
index 00000000..42491466
--- /dev/null
+++ b/src/api/k2v/error.rs
@@ -0,0 +1,135 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+use crate::signature::error::Error as SignatureError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ /// The object requested don't exists
+ #[error(display = "Key not found")]
+ NoSuchKey,
+
+ /// Some base64 encoded data was badly encoded
+ #[error(display = "Invalid base64: {}", _0)]
+ InvalidBase64(#[error(source)] base64::DecodeError),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
+ /// The client asked for an invalid return format (invalid Accept header)
+ #[error(display = "Not acceptable: {}", _0)]
+ NotAcceptable(String),
+
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::Common(CommonError::BadRequest(format!("{}", e))),
+ }
+ }
+}
+
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
+ match err {
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
+ }
+ }
+}
+
+impl Error {
+ /// This returns a keyword for the corresponding error.
+ /// Here, these keywords are not necessarily those from AWS S3,
+ /// as we are building a custom API
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchKey => "NoSuchKey",
+ Error::NotAcceptable(_) => "NotAcceptable",
+ Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
+ Error::InvalidBase64(_) => "InvalidBase64",
+ Error::InvalidHeader(_) => "InvalidHeaderValue",
+ Error::InvalidUtf8Str(_) => "InvalidUtf8String",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey => StatusCode::NOT_FOUND,
+ Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidBase64(_)
+ | Error::InvalidHeader(_)
+ | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>) {
+ use hyper::header;
+ header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
new file mode 100644
index 00000000..210950bf
--- /dev/null
+++ b/src/api/k2v/index.rs
@@ -0,0 +1,100 @@
+use std::sync::Arc;
+
+use hyper::{Body, Response, StatusCode};
+use serde::Serialize;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::ring::Ring;
+use garage_table::util::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+
+use crate::k2v::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_read_index(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+) -> Result<Response<Body>, Error> {
+ let reverse = reverse.unwrap_or(false);
+
+ let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+
+ let (partition_keys, more, next_start) = read_range(
+ &garage.k2v.counter_table.table,
+ &bucket_id,
+ &prefix,
+ &start,
+ &end,
+ limit,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ EnumerationOrder::from_reverse(reverse),
+ )
+ .await?;
+
+ let s_entries = ENTRIES.to_string();
+ let s_conflicts = CONFLICTS.to_string();
+ let s_values = VALUES.to_string();
+ let s_bytes = BYTES.to_string();
+
+ let resp = ReadIndexResponse {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ partition_keys: partition_keys
+ .into_iter()
+ .map(|part| {
+ let vals = part.filtered_values(&ring);
+ ReadIndexResponseEntry {
+ pk: part.sk,
+ entries: *vals.get(&s_entries).unwrap_or(&0),
+ conflicts: *vals.get(&s_conflicts).unwrap_or(&0),
+ values: *vals.get(&s_values).unwrap_or(&0),
+ bytes: *vals.get(&s_bytes).unwrap_or(&0),
+ }
+ })
+ .collect::<Vec<_>>(),
+ more,
+ next_start,
+ };
+
+ let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponse {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+
+ #[serde(rename = "partitionKeys")]
+ partition_keys: Vec<ReadIndexResponseEntry>,
+
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponseEntry {
+ pk: String,
+ entries: i64,
+ conflicts: i64,
+ values: i64,
+ bytes: i64,
+}
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
new file mode 100644
index 00000000..836d386f
--- /dev/null
+++ b/src/api/k2v/item.rs
@@ -0,0 +1,230 @@
+use std::sync::Arc;
+
+use http::header;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::k2v::error::*;
+
+pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+
+pub enum ReturnFormat {
+ Json,
+ Binary,
+ Either,
+}
+
+impl ReturnFormat {
+ pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ let accept = match req.headers().get(header::ACCEPT) {
+ Some(a) => a.to_str()?,
+ None => return Ok(Self::Json),
+ };
+
+ let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
+ let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*");
+ let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*");
+
+ match (accept_json, accept_binary) {
+ (true, true) => Ok(Self::Either),
+ (true, false) => Ok(Self::Json),
+ (false, true) => Ok(Self::Binary),
+ (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())),
+ }
+ }
+
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ let vals = item.values();
+
+ if vals.is_empty() {
+ return Err(Error::NoSuchKey);
+ }
+
+ let ct = item.causal_context().serialize();
+ match self {
+ Self::Binary if vals.len() > 1 => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .status(StatusCode::CONFLICT)
+ .body(Body::empty())?),
+ Self::Binary => {
+ assert!(vals.len() == 1);
+ Self::make_binary_response(ct, vals[0])
+ }
+ Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]),
+ _ => Self::make_json_response(ct, &vals[..]),
+ }
+ }
+
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ match v {
+ DvvsValue::Deleted => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?),
+ DvvsValue::Value(v) => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::OK)
+ .body(Body::from(v.to_vec()))?),
+ }
+ }
+
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ let items = v
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => serde_json::Value::Null,
+ DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
+ })
+ .collect::<Vec<_>>();
+ let json_body =
+ serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?;
+ Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/json")
+ .status(StatusCode::OK)
+ .body(Body::from(json_body))?)
+ }
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_read_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &String,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let item = garage
+ .k2v
+ .item_table
+ .get(
+ &K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ sort_key,
+ )
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ format.make_response(&item)
+}
+
+pub async fn handle_insert_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let value = DvvsValue::Value(body.to_vec());
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_delete_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let value = DvvsValue::Deleted;
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_poll_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout_secs: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let causal_context =
+ CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+
+ let item = garage
+ .k2v
+ .rpc
+ .poll(
+ bucket_id,
+ partition_key,
+ sort_key,
+ causal_context,
+ timeout_secs.unwrap_or(300) * 1000,
+ )
+ .await?;
+
+ if let Some(item) = item {
+ format.make_response(&item)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
new file mode 100644
index 00000000..b6a8c5cf
--- /dev/null
+++ b/src/api/k2v/mod.rs
@@ -0,0 +1,9 @@
+pub mod api_server;
+mod error;
+mod router;
+
+mod batch;
+mod index;
+mod item;
+
+mod range;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
new file mode 100644
index 00000000..bb9d3be5
--- /dev/null
+++ b/src/api/k2v/range.rs
@@ -0,0 +1,100 @@
+//! Utility module for retrieving ranges of items in Garage tables
+//! Implements parameters (prefix, start, end, limit) as specified
+//! for endpoints ReadIndex, ReadBatch and DeleteBatch
+
+use std::sync::Arc;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::helpers::key_after_prefix;
+use crate::k2v::error::*;
+
+/// Read range in a Garage table.
+/// Returns (entries, more?, nextStart)
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn read_range<F>(
+ table: &Arc<Table<F, TableShardedReplication>>,
+ partition_key: &F::P,
+ prefix: &Option<String>,
+ start: &Option<String>,
+ end: &Option<String>,
+ limit: Option<u64>,
+ filter: Option<F::Filter>,
+ enumeration_order: EnumerationOrder,
+) -> Result<(Vec<F::E>, bool, Option<String>), Error>
+where
+ F: TableSchema<S = String> + 'static,
+{
+ let (mut start, mut start_ignore) = match (prefix, start) {
+ (None, None) => (None, false),
+ (None, Some(s)) => (Some(s.clone()), false),
+ (Some(p), Some(s)) => {
+ if !s.starts_with(p) {
+ return Err(Error::bad_request(format!(
+ "Start key '{}' does not start with prefix '{}'",
+ s, p
+ )));
+ }
+ (Some(s.clone()), false)
+ }
+ (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
+ let start = key_after_prefix(p)
+ .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
+ (Some(start), true)
+ }
+ (Some(p), None) => (Some(p.clone()), false),
+ };
+
+ let mut entries = vec![];
+ loop {
+ let n_get = std::cmp::min(
+ 1000,
+ limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
+ );
+ let get_ret = table
+ .get_range(
+ partition_key,
+ start.clone(),
+ filter.clone(),
+ n_get,
+ enumeration_order,
+ )
+ .await?;
+
+ let get_ret_len = get_ret.len();
+
+ for entry in get_ret {
+ if start_ignore && Some(entry.sort_key()) == start.as_ref() {
+ continue;
+ }
+ if let Some(p) = prefix {
+ if !entry.sort_key().starts_with(p) {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(e) = end {
+ let is_finished = match enumeration_order {
+ EnumerationOrder::Forward => entry.sort_key() >= e,
+ EnumerationOrder::Reverse => entry.sort_key() <= e,
+ };
+ if is_finished {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(l) = limit {
+ if entries.len() >= l as usize {
+ return Ok((entries, true, Some(entry.sort_key().clone())));
+ }
+ }
+ entries.push(entry);
+ }
+
+ if get_ret_len < n_get {
+ return Ok((entries, false, None));
+ }
+
+ start = Some(entries.last().unwrap().sort_key().clone());
+ start_ignore = true;
+ }
+}
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
new file mode 100644
index 00000000..50e6965b
--- /dev/null
+++ b/src/api/k2v/router.rs
@@ -0,0 +1,252 @@
+use crate::k2v::error::*;
+
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+
+router_match! {@func
+
+
+/// List of all K2V API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ DeleteBatch {
+ },
+ DeleteItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ InsertBatch {
+ },
+ InsertItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ Options,
+ PollItem {
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout: Option<u64>,
+ },
+ ReadBatch {
+ },
+ ReadIndex {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+ },
+ ReadItem {
+ partition_key: String,
+ sort_key: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> {
+ let uri = req.uri();
+ let path = uri.path().trim_start_matches('/');
+ let query = uri.query();
+
+ let (bucket, partition_key) = path
+ .split_once('/')
+ .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
+ .unwrap_or((path.to_owned(), ""));
+
+ if bucket.is_empty() {
+ return Err(Error::bad_request("Missing bucket name"));
+ }
+
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, bucket));
+ }
+
+ let partition_key = percent_encoding::percent_decode_str(partition_key)
+ .decode_utf8()?
+ .into_owned();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let method_search = Method::from_bytes(b"SEARCH").unwrap();
+ let res = match *req.method() {
+ Method::GET => Self::from_get(partition_key, &mut query)?,
+ //&Method::HEAD => Self::from_head(partition_key, &mut query)?,
+ Method::POST => Self::from_post(partition_key, &mut query)?,
+ Method::PUT => Self::from_put(partition_key, &mut query)?,
+ Method::DELETE => Self::from_delete(partition_key, &mut query)?,
+ _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
+ _ => return Err(Error::bad_request("Unknown method")),
+ };
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+ Ok((res, bucket))
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a GET.
+ fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout),
+ EMPTY => ReadItem (query::sort_key),
+ ],
+ no_key: [
+ EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse),
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a SEARCH.
+ fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => ReadBatch,
+ ]
+ }
+ }
+
+ /*
+ /// Determine which endpoint a request is for, knowing it is a HEAD.
+ fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => HeadBucket,
+ ]
+ }
+ }
+ */
+
+ /// Determine which endpoint a request is for, knowing it is a POST.
+ fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => InsertBatch,
+ DELETE => DeleteBatch,
+ SEARCH => ReadBatch,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a PUT.
+ fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => InsertItem (query::sort_key),
+
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a DELETE.
+ fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => DeleteItem (query::sort_key),
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Get the partition key the request target. Returns None for requests which don't use a partition key.
+ #[allow(dead_code)]
+ pub fn get_partition_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ partition_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the sort key the request target. Returns None for requests which don't use a sort key.
+ #[allow(dead_code)]
+ pub fn get_sort_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ sort_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ let readonly = router_match! {
+ @match
+ self,
+ [
+ PollItem,
+ ReadBatch,
+ ReadIndex,
+ ReadItem,
+ ]
+ };
+ if readonly {
+ Authorization::Read
+ } else {
+ Authorization::Write
+ }
+ }
+}
+
+// parameter name => struct field
+generateQueryParameters! {
+ "prefix" => prefix,
+ "start" => start,
+ "causality_token" => causality_token,
+ "end" => end,
+ "limit" => limit,
+ "reverse" => reverse,
+ "sort_key" => sort_key,
+ "timeout" => timeout
+}
+
+mod keywords {
+ //! This module contain all query parameters with no associated value
+ //! used to differentiate endpoints.
+ pub const EMPTY: &str = "";
+
+ pub const DELETE: &str = "delete";
+ pub const SEARCH: &str = "search";
+}