aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v')
-rw-r--r--src/api/k2v/api_server.rs195
-rw-r--r--src/api/k2v/batch.rs368
-rw-r--r--src/api/k2v/index.rs100
-rw-r--r--src/api/k2v/item.rs230
-rw-r--r--src/api/k2v/mod.rs8
-rw-r--r--src/api/k2v/range.rs96
-rw-r--r--src/api/k2v/router.rs252
7 files changed, 1249 insertions, 0 deletions
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
new file mode 100644
index 00000000..5f5e9030
--- /dev/null
+++ b/src/api/k2v/api_server.rs
@@ -0,0 +1,195 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use futures::future::Future;
+use hyper::{Body, Method, Request, Response};
+
+use opentelemetry::{trace::SpanRef, KeyValue};
+
+use garage_table::util::*;
+use garage_util::error::Error as GarageError;
+
+use garage_model::garage::Garage;
+
+use crate::error::*;
+use crate::generic_server::*;
+
+use crate::signature::payload::check_payload_signature;
+use crate::signature::streaming::*;
+
+use crate::helpers::*;
+use crate::k2v::batch::*;
+use crate::k2v::index::*;
+use crate::k2v::item::*;
+use crate::k2v::router::Endpoint;
+use crate::s3::cors::*;
+
+pub struct K2VApiServer {
+ garage: Arc<Garage>,
+}
+
+pub(crate) struct K2VApiEndpoint {
+ bucket_name: String,
+ endpoint: Endpoint,
+}
+
+impl K2VApiServer {
+ pub async fn run(
+ garage: Arc<Garage>,
+ shutdown_signal: impl Future<Output = ()>,
+ ) -> Result<(), GarageError> {
+ if let Some(cfg) = &garage.config.k2v_api {
+ let bind_addr = cfg.api_bind_addr;
+
+ ApiServer::new(
+ garage.config.s3_api.s3_region.clone(),
+ K2VApiServer { garage },
+ )
+ .run_server(bind_addr, shutdown_signal)
+ .await
+ } else {
+ Ok(())
+ }
+ }
+}
+
+#[async_trait]
+impl ApiHandler for K2VApiServer {
+ const API_NAME: &'static str = "k2v";
+ const API_NAME_DISPLAY: &'static str = "K2V";
+
+ type Endpoint = K2VApiEndpoint;
+
+ fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
+ let (endpoint, bucket_name) = Endpoint::from_request(req)?;
+
+ Ok(K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ })
+ }
+
+ async fn handle(
+ &self,
+ req: Request<Body>,
+ endpoint: K2VApiEndpoint,
+ ) -> Result<Response<Body>, Error> {
+ let K2VApiEndpoint {
+ bucket_name,
+ endpoint,
+ } = endpoint;
+ let garage = self.garage.clone();
+
+ // The OPTIONS method is procesed early, before we even check for an API key
+ if let Endpoint::Options = endpoint {
+ return handle_options_s3api(garage, &req, Some(bucket_name)).await;
+ }
+
+ let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
+ let api_key = api_key.ok_or_else(|| {
+ Error::Forbidden("Garage does not support anonymous access yet".to_string())
+ })?;
+
+ let req = parse_streaming_body(
+ &api_key,
+ req,
+ &mut content_sha256,
+ &garage.config.s3_api.s3_region,
+ "k2v",
+ )?;
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket = garage
+ .bucket_table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or(Error::NoSuchBucket)?;
+
+ let allowed = match endpoint.authorization_type() {
+ Authorization::Read => api_key.allow_read(&bucket_id),
+ Authorization::Write => api_key.allow_write(&bucket_id),
+ Authorization::Owner => api_key.allow_owner(&bucket_id),
+ _ => unreachable!(),
+ };
+
+ if !allowed {
+ return Err(Error::Forbidden(
+ "Operation is not allowed for this key.".to_string(),
+ ));
+ }
+
+ // Look up what CORS rule might apply to response.
+ // Requests for methods different than GET, HEAD or POST
+ // are always preflighted, i.e. the browser should make
+ // an OPTIONS call before to check it is allowed
+ let matching_cors_rule = match *req.method() {
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ _ => None,
+ };
+
+ let resp = match endpoint {
+ Endpoint::DeleteItem {
+ partition_key,
+ sort_key,
+ } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::InsertItem {
+ partition_key,
+ sort_key,
+ } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::ReadItem {
+ partition_key,
+ sort_key,
+ } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await,
+ Endpoint::PollItem {
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ } => {
+ handle_poll_item(
+ garage,
+ &req,
+ bucket_id,
+ partition_key,
+ sort_key,
+ causality_token,
+ timeout,
+ )
+ .await
+ }
+ Endpoint::ReadIndex {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await,
+ Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
+ Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
+ Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::Options => unreachable!(),
+ };
+
+ // If request was a success and we have a CORS rule that applies to it,
+ // add the corresponding CORS headers to the response
+ let mut resp_ok = resp?;
+ if let Some(rule) = matching_cors_rule {
+ add_cors_headers(&mut resp_ok, rule)
+ .ok_or_internal_error("Invalid bucket CORS configuration")?;
+ }
+
+ Ok(resp_ok)
+ }
+}
+
+impl ApiEndpoint for K2VApiEndpoint {
+ fn name(&self) -> &'static str {
+ self.endpoint.name()
+ }
+
+ fn add_span_attributes(&self, span: SpanRef<'_>) {
+ span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone()));
+ }
+}
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
new file mode 100644
index 00000000..4ecddeb9
--- /dev/null
+++ b/src/api/k2v/batch.rs
@@ -0,0 +1,368 @@
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::{EnumerationOrder, TableSchema};
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_insert_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let items: Vec<InsertBatchItem> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let mut items2 = vec![];
+ for it in items {
+ let ct = it
+ .ct
+ .map(|s| CausalContext::parse(&s))
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+ let v = match it.v {
+ Some(vs) => {
+ DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
+ }
+ None => DvvsValue::Deleted,
+ };
+ items2.push((it.pk, it.sk, ct, v));
+ }
+
+ garage.k2v.rpc.insert_batch(bucket_id, items2).await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_read_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let queries: Vec<ReadBatchQuery> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_read_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<ReadBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_read_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: ReadBatchQuery,
+) -> Result<ReadBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: !query.tombstones,
+ conflicts_only: query.conflicts_only,
+ };
+
+ let (items, more, next_start) = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
+ return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into()));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None),
+ None => (vec![], false, None),
+ }
+ } else {
+ let (items, more, next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ query.limit,
+ Some(filter),
+ EnumerationOrder::from_reverse(query.reverse),
+ )
+ .await?;
+
+ let items = items
+ .into_iter()
+ .map(ReadBatchResponseItem::from)
+ .collect::<Vec<_>>();
+
+ (items, more, next_start)
+ };
+
+ Ok(ReadBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ limit: query.limit,
+ reverse: query.reverse,
+ single_item: query.single_item,
+ conflicts_only: query.conflicts_only,
+ tombstones: query.tombstones,
+ items,
+ more,
+ next_start,
+ })
+}
+
+pub async fn handle_delete_batch(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let queries: Vec<DeleteBatchQuery> =
+ serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+
+ let resp_results = futures::future::join_all(
+ queries
+ .into_iter()
+ .map(|q| handle_delete_batch_query(&garage, bucket_id, q)),
+ )
+ .await;
+
+ let mut resps: Vec<DeleteBatchResponse> = vec![];
+ for resp in resp_results {
+ resps.push(resp?);
+ }
+
+ let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+async fn handle_delete_batch_query(
+ garage: &Arc<Garage>,
+ bucket_id: Uuid,
+ query: DeleteBatchQuery,
+) -> Result<DeleteBatchResponse, Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key: query.partition_key.clone(),
+ };
+
+ let filter = ItemFilter {
+ exclude_only_tombstones: true,
+ conflicts_only: false,
+ };
+
+ let deleted_items = if query.single_item {
+ if query.prefix.is_some() || query.end.is_some() {
+ return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into()));
+ }
+ let sk = query
+ .start
+ .as_ref()
+ .ok_or_bad_request("start should be specified if single_item is set")?;
+ let item = garage
+ .k2v
+ .item_table
+ .get(&partition, sk)
+ .await?
+ .filter(|e| K2VItemTable::matches_filter(e, &filter));
+ match item {
+ Some(i) => {
+ let cc = i.causal_context();
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ .await?;
+ 1
+ }
+ None => 0,
+ }
+ } else {
+ let (items, more, _next_start) = read_range(
+ &garage.k2v.item_table,
+ &partition,
+ &query.prefix,
+ &query.start,
+ &query.end,
+ None,
+ Some(filter),
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ assert!(!more);
+
+ // TODO delete items
+ let items = items
+ .into_iter()
+ .map(|i| {
+ let cc = i.causal_context();
+ (
+ i.partition.partition_key,
+ i.sort_key,
+ Some(cc),
+ DvvsValue::Deleted,
+ )
+ })
+ .collect::<Vec<_>>();
+ let n = items.len();
+
+ garage.k2v.rpc.insert_batch(bucket_id, items).await?;
+
+ n
+ };
+
+ Ok(DeleteBatchResponse {
+ partition_key: query.partition_key,
+ prefix: query.prefix,
+ start: query.start,
+ end: query.end,
+ single_item: query.single_item,
+ deleted_items,
+ })
+}
+
+#[derive(Deserialize)]
+struct InsertBatchItem {
+ pk: String,
+ sk: String,
+ ct: Option<String>,
+ v: Option<String>,
+}
+
+#[derive(Deserialize)]
+struct ReadBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ limit: Option<u64>,
+ #[serde(default)]
+ reverse: bool,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+ #[serde(default, rename = "conflictsOnly")]
+ conflicts_only: bool,
+ #[serde(default)]
+ tombstones: bool,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+ #[serde(rename = "conflictsOnly")]
+ conflicts_only: bool,
+ tombstones: bool,
+
+ items: Vec<ReadBatchResponseItem>,
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadBatchResponseItem {
+ sk: String,
+ ct: String,
+ v: Vec<Option<String>>,
+}
+
+impl ReadBatchResponseItem {
+ fn from(i: K2VItem) -> Self {
+ let ct = i.causal_context().serialize();
+ let v = i
+ .values()
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Deleted => None,
+ })
+ .collect::<Vec<_>>();
+ Self {
+ sk: i.sort_key,
+ ct,
+ v,
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct DeleteBatchQuery {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default, rename = "singleItem")]
+ single_item: bool,
+}
+
+#[derive(Serialize)]
+struct DeleteBatchResponse {
+ #[serde(rename = "partitionKey")]
+ partition_key: String,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ #[serde(rename = "singleItem")]
+ single_item: bool,
+
+ #[serde(rename = "deletedItems")]
+ deleted_items: usize,
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
new file mode 100644
index 00000000..896dbcf0
--- /dev/null
+++ b/src/api/k2v/index.rs
@@ -0,0 +1,100 @@
+use std::sync::Arc;
+
+use hyper::{Body, Response, StatusCode};
+use serde::Serialize;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_rpc::ring::Ring;
+use garage_table::util::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+
+use crate::error::*;
+use crate::k2v::range::read_range;
+
+pub async fn handle_read_index(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+) -> Result<Response<Body>, Error> {
+ let reverse = reverse.unwrap_or(false);
+
+ let ring: Arc<Ring> = garage.system.ring.borrow().clone();
+
+ let (partition_keys, more, next_start) = read_range(
+ &garage.k2v.counter_table.table,
+ &bucket_id,
+ &prefix,
+ &start,
+ &end,
+ limit,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ EnumerationOrder::from_reverse(reverse),
+ )
+ .await?;
+
+ let s_entries = ENTRIES.to_string();
+ let s_conflicts = CONFLICTS.to_string();
+ let s_values = VALUES.to_string();
+ let s_bytes = BYTES.to_string();
+
+ let resp = ReadIndexResponse {
+ prefix,
+ start,
+ end,
+ limit,
+ reverse,
+ partition_keys: partition_keys
+ .into_iter()
+ .map(|part| {
+ let vals = part.filtered_values(&ring);
+ ReadIndexResponseEntry {
+ pk: part.sk,
+ entries: *vals.get(&s_entries).unwrap_or(&0),
+ conflicts: *vals.get(&s_conflicts).unwrap_or(&0),
+ values: *vals.get(&s_values).unwrap_or(&0),
+ bytes: *vals.get(&s_bytes).unwrap_or(&0),
+ }
+ })
+ .collect::<Vec<_>>(),
+ more,
+ next_start,
+ };
+
+ let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponse {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: bool,
+
+ #[serde(rename = "partitionKeys")]
+ partition_keys: Vec<ReadIndexResponseEntry>,
+
+ more: bool,
+ #[serde(rename = "nextStart")]
+ next_start: Option<String>,
+}
+
+#[derive(Serialize)]
+struct ReadIndexResponseEntry {
+ pk: String,
+ entries: i64,
+ conflicts: i64,
+ values: i64,
+ bytes: i64,
+}
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
new file mode 100644
index 00000000..1860863e
--- /dev/null
+++ b/src/api/k2v/item.rs
@@ -0,0 +1,230 @@
+use std::sync::Arc;
+
+use http::header;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+
+use garage_model::garage::Garage;
+use garage_model::k2v::causality::*;
+use garage_model::k2v::item_table::*;
+
+use crate::error::*;
+
+pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+
+pub enum ReturnFormat {
+ Json,
+ Binary,
+ Either,
+}
+
+impl ReturnFormat {
+ pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ let accept = match req.headers().get(header::ACCEPT) {
+ Some(a) => a.to_str()?,
+ None => return Ok(Self::Json),
+ };
+
+ let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>();
+ let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*");
+ let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*");
+
+ match (accept_json, accept_binary) {
+ (true, true) => Ok(Self::Either),
+ (true, false) => Ok(Self::Json),
+ (false, true) => Ok(Self::Binary),
+ (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())),
+ }
+ }
+
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ let vals = item.values();
+
+ if vals.is_empty() {
+ return Err(Error::NoSuchKey);
+ }
+
+ let ct = item.causal_context().serialize();
+ match self {
+ Self::Binary if vals.len() > 1 => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .status(StatusCode::CONFLICT)
+ .body(Body::empty())?),
+ Self::Binary => {
+ assert!(vals.len() == 1);
+ Self::make_binary_response(ct, vals[0])
+ }
+ Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]),
+ _ => Self::make_json_response(ct, &vals[..]),
+ }
+ }
+
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ match v {
+ DvvsValue::Deleted => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?),
+ DvvsValue::Value(v) => Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .status(StatusCode::OK)
+ .body(Body::from(v.to_vec()))?),
+ }
+ }
+
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ let items = v
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => serde_json::Value::Null,
+ DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
+ })
+ .collect::<Vec<_>>();
+ let json_body =
+ serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?;
+ Ok(Response::builder()
+ .header(X_GARAGE_CAUSALITY_TOKEN, ct)
+ .header(header::CONTENT_TYPE, "application/json")
+ .status(StatusCode::OK)
+ .body(Body::from(json_body))?)
+ }
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_read_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &String,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let item = garage
+ .k2v
+ .item_table
+ .get(
+ &K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ sort_key,
+ )
+ .await?
+ .ok_or(Error::NoSuchKey)?;
+
+ format.make_response(&item)
+}
+
+pub async fn handle_insert_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+ let value = DvvsValue::Value(body.to_vec());
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::empty())?)
+}
+
+pub async fn handle_delete_item(
+ garage: Arc<Garage>,
+ req: Request<Body>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ sort_key: &str,
+) -> Result<Response<Body>, Error> {
+ let causal_context = req
+ .headers()
+ .get(X_GARAGE_CAUSALITY_TOKEN)
+ .map(|s| s.to_str())
+ .transpose()?
+ .map(CausalContext::parse)
+ .transpose()
+ .ok_or_bad_request("Invalid causality token")?;
+
+ let value = DvvsValue::Deleted;
+
+ garage
+ .k2v
+ .rpc
+ .insert(
+ bucket_id,
+ partition_key.to_string(),
+ sort_key.to_string(),
+ causal_context,
+ value,
+ )
+ .await?;
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+/// Handle ReadItem request
+#[allow(clippy::ptr_arg)]
+pub async fn handle_poll_item(
+ garage: Arc<Garage>,
+ req: &Request<Body>,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout_secs: Option<u64>,
+) -> Result<Response<Body>, Error> {
+ let format = ReturnFormat::from(req)?;
+
+ let causal_context =
+ CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+
+ let item = garage
+ .k2v
+ .rpc
+ .poll(
+ bucket_id,
+ partition_key,
+ sort_key,
+ causal_context,
+ timeout_secs.unwrap_or(300) * 1000,
+ )
+ .await?;
+
+ if let Some(item) = item {
+ format.make_response(&item)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
new file mode 100644
index 00000000..ee210ad5
--- /dev/null
+++ b/src/api/k2v/mod.rs
@@ -0,0 +1,8 @@
+pub mod api_server;
+mod router;
+
+mod batch;
+mod index;
+mod item;
+
+mod range;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
new file mode 100644
index 00000000..cd019723
--- /dev/null
+++ b/src/api/k2v/range.rs
@@ -0,0 +1,96 @@
+//! Utility module for retrieving ranges of items in Garage tables
+//! Implements parameters (prefix, start, end, limit) as specified
+//! for endpoints ReadIndex, ReadBatch and DeleteBatch
+
+use std::sync::Arc;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::error::*;
+use crate::helpers::key_after_prefix;
+
+/// Read range in a Garage table.
+/// Returns (entries, more?, nextStart)
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn read_range<F>(
+ table: &Arc<Table<F, TableShardedReplication>>,
+ partition_key: &F::P,
+ prefix: &Option<String>,
+ start: &Option<String>,
+ end: &Option<String>,
+ limit: Option<u64>,
+ filter: Option<F::Filter>,
+ enumeration_order: EnumerationOrder,
+) -> Result<(Vec<F::E>, bool, Option<String>), Error>
+where
+ F: TableSchema<S = String> + 'static,
+{
+ let (mut start, mut start_ignore) = match (prefix, start) {
+ (None, None) => (None, false),
+ (None, Some(s)) => (Some(s.clone()), false),
+ (Some(p), Some(s)) => {
+ if !s.starts_with(p) {
+ return Err(Error::BadRequest(format!(
+ "Start key '{}' does not start with prefix '{}'",
+ s, p
+ )));
+ }
+ (Some(s.clone()), false)
+ }
+ (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
+ let start = key_after_prefix(p)
+ .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
+ (Some(start), true)
+ }
+ (Some(p), None) => (Some(p.clone()), false),
+ };
+
+ let mut entries = vec![];
+ loop {
+ let n_get = std::cmp::min(
+ 1000,
+ limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
+ );
+ let get_ret = table
+ .get_range(
+ partition_key,
+ start.clone(),
+ filter.clone(),
+ n_get,
+ enumeration_order,
+ )
+ .await?;
+
+ let get_ret_len = get_ret.len();
+
+ for entry in get_ret {
+ if start_ignore && Some(entry.sort_key()) == start.as_ref() {
+ continue;
+ }
+ if let Some(p) = prefix {
+ if !entry.sort_key().starts_with(p) {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(e) = end {
+ if entry.sort_key() == e {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(l) = limit {
+ if entries.len() >= l as usize {
+ return Ok((entries, true, Some(entry.sort_key().clone())));
+ }
+ }
+ entries.push(entry);
+ }
+
+ if get_ret_len < n_get {
+ return Ok((entries, false, None));
+ }
+
+ start = Some(entries.last().unwrap().sort_key().clone());
+ start_ignore = true;
+ }
+}
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
new file mode 100644
index 00000000..f948ffce
--- /dev/null
+++ b/src/api/k2v/router.rs
@@ -0,0 +1,252 @@
+use crate::error::*;
+
+use std::borrow::Cow;
+
+use hyper::{Method, Request};
+
+use crate::helpers::Authorization;
+use crate::router_macros::{generateQueryParameters, router_match};
+
+router_match! {@func
+
+
+/// List of all K2V API endpoints.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Endpoint {
+ DeleteBatch {
+ },
+ DeleteItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ InsertBatch {
+ },
+ InsertItem {
+ partition_key: String,
+ sort_key: String,
+ },
+ Options,
+ PollItem {
+ partition_key: String,
+ sort_key: String,
+ causality_token: String,
+ timeout: Option<u64>,
+ },
+ ReadBatch {
+ },
+ ReadIndex {
+ prefix: Option<String>,
+ start: Option<String>,
+ end: Option<String>,
+ limit: Option<u64>,
+ reverse: Option<bool>,
+ },
+ ReadItem {
+ partition_key: String,
+ sort_key: String,
+ },
+}}
+
+impl Endpoint {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> {
+ let uri = req.uri();
+ let path = uri.path().trim_start_matches('/');
+ let query = uri.query();
+
+ let (bucket, partition_key) = path
+ .split_once('/')
+ .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
+ .unwrap_or((path.to_owned(), ""));
+
+ if bucket.is_empty() {
+ return Err(Error::BadRequest("Missing bucket name".to_owned()));
+ }
+
+ if *req.method() == Method::OPTIONS {
+ return Ok((Self::Options, bucket));
+ }
+
+ let partition_key = percent_encoding::percent_decode_str(partition_key)
+ .decode_utf8()?
+ .into_owned();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let method_search = Method::from_bytes(b"SEARCH").unwrap();
+ let res = match *req.method() {
+ Method::GET => Self::from_get(partition_key, &mut query)?,
+ //&Method::HEAD => Self::from_head(partition_key, &mut query)?,
+ Method::POST => Self::from_post(partition_key, &mut query)?,
+ Method::PUT => Self::from_put(partition_key, &mut query)?,
+ Method::DELETE => Self::from_delete(partition_key, &mut query)?,
+ _ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
+ _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ };
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+ Ok((res, bucket))
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a GET.
+ fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY if causality_token => PollItem (query::sort_key, query::causality_token, opt_parse::timeout),
+ EMPTY => ReadItem (query::sort_key),
+ ],
+ no_key: [
+ EMPTY => ReadIndex (query_opt::prefix, query_opt::start, query_opt::end, opt_parse::limit, opt_parse::reverse),
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a SEARCH.
+ fn from_search(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => ReadBatch,
+ ]
+ }
+ }
+
+ /*
+ /// Determine which endpoint a request is for, knowing it is a HEAD.
+ fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
+ ],
+ no_key: [
+ EMPTY => HeadBucket,
+ ]
+ }
+ }
+ */
+
+ /// Determine which endpoint a request is for, knowing it is a POST.
+ fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ ],
+ no_key: [
+ EMPTY => InsertBatch,
+ DELETE => DeleteBatch,
+ SEARCH => ReadBatch,
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a PUT.
+ fn from_put(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => InsertItem (query::sort_key),
+
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Determine which endpoint a request is for, knowing it is a DELETE.
+ fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
+ router_match! {
+ @gen_parser
+ (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
+ key: [
+ EMPTY => DeleteItem (query::sort_key),
+ ],
+ no_key: [
+ ]
+ }
+ }
+
+ /// Get the partition key the request target. Returns None for requests which don't use a partition key.
+ #[allow(dead_code)]
+ pub fn get_partition_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ partition_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the sort key the request target. Returns None for requests which don't use a sort key.
+ #[allow(dead_code)]
+ pub fn get_sort_key(&self) -> Option<&str> {
+ router_match! {
+ @extract
+ self,
+ sort_key,
+ [
+ DeleteItem,
+ InsertItem,
+ PollItem,
+ ReadItem,
+ ]
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ let readonly = router_match! {
+ @match
+ self,
+ [
+ PollItem,
+ ReadBatch,
+ ReadIndex,
+ ReadItem,
+ ]
+ };
+ if readonly {
+ Authorization::Read
+ } else {
+ Authorization::Write
+ }
+ }
+}
+
+// parameter name => struct field
+generateQueryParameters! {
+ "prefix" => prefix,
+ "start" => start,
+ "causality_token" => causality_token,
+ "end" => end,
+ "limit" => limit,
+ "reverse" => reverse,
+ "sort_key" => sort_key,
+ "timeout" => timeout
+}
+
+mod keywords {
+ //! This module contain all query parameters with no associated value
+ //! used to differentiate endpoints.
+ pub const EMPTY: &str = "";
+
+ pub const DELETE: &str = "delete";
+ pub const SEARCH: &str = "search";
+}