use base64::prelude::*; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_table::{EnumerationOrder, TableSchema}; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::helpers::*; use crate::k2v::api_server::{ReqBody, ResBody}; use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( ctx: ReqCtx, req: Request, ) -> Result, Error> { let ReqCtx { garage, bucket_id, .. } = &ctx; let items = parse_json_body::, _, Error>(req).await?; let mut items2 = vec![]; for it in items { let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { Some(vs) => DvvsValue::Value( BASE64_STANDARD .decode(vs) .ok_or_bad_request("Invalid base64 value")?, ), None => DvvsValue::Deleted, }; items2.push((it.pk, it.sk, ct, v)); } garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(empty_body())?) } pub async fn handle_read_batch( ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; let resp_results = futures::future::join_all( queries .into_iter() .map(|q| handle_read_batch_query(&ctx, q)), ) .await; let mut resps: Vec = vec![]; for resp in resp_results { resps.push(resp?); } Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( ctx: &ReqCtx, query: ReadBatchQuery, ) -> Result { let ReqCtx { garage, bucket_id, .. } = ctx; let partition = K2VItemPartition { bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; let filter = ItemFilter { exclude_only_tombstones: !query.tombstones, conflicts_only: query.conflicts_only, }; let (items, more, next_start) = if query.single_item { if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse { return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.")); } let sk = query .start .as_ref() .ok_or_bad_request("start should be specified if single_item is set")?; let item = garage .k2v .item_table .get(&partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { Some(i) => (vec![ReadBatchResponseItem::from(i)], false, None), None => (vec![], false, None), } } else { let (items, more, next_start) = read_range( &garage.k2v.item_table, &partition, &query.prefix, &query.start, &query.end, query.limit, Some(filter), EnumerationOrder::from_reverse(query.reverse), ) .await?; let items = items .into_iter() .map(ReadBatchResponseItem::from) .collect::>(); (items, more, next_start) }; Ok(ReadBatchResponse { partition_key: query.partition_key, prefix: query.prefix, start: query.start, end: query.end, limit: query.limit, reverse: query.reverse, single_item: query.single_item, conflicts_only: query.conflicts_only, tombstones: query.tombstones, items, more, next_start, }) } pub async fn handle_delete_batch( ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; let resp_results = futures::future::join_all( queries .into_iter() .map(|q| handle_delete_batch_query(&ctx, q)), ) .await; let mut resps: Vec = vec![]; for resp in resp_results { resps.push(resp?); } Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( ctx: &ReqCtx, query: DeleteBatchQuery, ) -> Result { let ReqCtx { garage, bucket_id, .. } = &ctx; let partition = K2VItemPartition { bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; let filter = ItemFilter { exclude_only_tombstones: true, conflicts_only: false, }; let deleted_items = if query.single_item { if query.prefix.is_some() || query.end.is_some() { return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.")); } let sk = query .start .as_ref() .ok_or_bad_request("start should be specified if single_item is set")?; let item = garage .k2v .item_table .get(&partition, sk) .await? .filter(|e| K2VItemTable::matches_filter(e, &filter)); match item { Some(i) => { let cc = i.causal_context(); garage .k2v .rpc .insert( *bucket_id, i.partition.partition_key, i.sort_key, Some(cc), DvvsValue::Deleted, ) .await?; 1 } None => 0, } } else { let (items, more, _next_start) = read_range( &garage.k2v.item_table, &partition, &query.prefix, &query.start, &query.end, None, Some(filter), EnumerationOrder::Forward, ) .await?; assert!(!more); // TODO delete items let items = items .into_iter() .map(|i| { let cc = i.causal_context(); ( i.partition.partition_key, i.sort_key, Some(cc), DvvsValue::Deleted, ) }) .collect::>(); let n = items.len(); garage.k2v.rpc.insert_batch(*bucket_id, items).await?; n }; Ok(DeleteBatchResponse { partition_key: query.partition_key, prefix: query.prefix, start: query.start, end: query.end, single_item: query.single_item, deleted_items, }) } pub(crate) async fn handle_poll_range( ctx: ReqCtx, partition_key: &str, req: Request, ) -> Result, Error> { let ReqCtx { garage, bucket_id, .. } = ctx; use garage_model::k2v::sub::PollRange; let query = parse_json_body::(req).await?; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; let resp = garage .k2v .rpc .poll_range( PollRange { partition: K2VItemPartition { bucket_id, partition_key: partition_key.to_string(), }, start: query.start, end: query.end, prefix: query.prefix, }, query.seen_marker, timeout_msec, ) .await?; if let Some((items, seen_marker)) = resp { let resp = PollRangeResponse { items: items .into_values() .map(ReadBatchResponseItem::from) .collect::>(), seen_marker, }; Ok(json_ok_response(&resp)?) } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) .body(empty_body())?) } } #[derive(Deserialize)] struct InsertBatchItem { pk: String, sk: String, ct: Option, v: Option, } #[derive(Deserialize)] struct ReadBatchQuery { #[serde(rename = "partitionKey")] partition_key: String, #[serde(default)] prefix: Option, #[serde(default)] start: Option, #[serde(default)] end: Option, #[serde(default)] limit: Option, #[serde(default)] reverse: bool, #[serde(default, rename = "singleItem")] single_item: bool, #[serde(default, rename = "conflictsOnly")] conflicts_only: bool, #[serde(default)] tombstones: bool, } #[derive(Serialize)] struct ReadBatchResponse { #[serde(rename = "partitionKey")] partition_key: String, prefix: Option, start: Option, end: Option, limit: Option, reverse: bool, #[serde(rename = "singleItem")] single_item: bool, #[serde(rename = "conflictsOnly")] conflicts_only: bool, tombstones: bool, items: Vec, more: bool, #[serde(rename = "nextStart")] next_start: Option, } #[derive(Serialize)] struct ReadBatchResponseItem { sk: String, ct: String, v: Vec>, } impl ReadBatchResponseItem { fn from(i: K2VItem) -> Self { let ct = i.causal_context().serialize(); let v = i .values() .iter() .map(|v| match v { DvvsValue::Value(x) => Some(BASE64_STANDARD.encode(x)), DvvsValue::Deleted => None, }) .collect::>(); Self { sk: i.sort_key, ct, v, } } } #[derive(Deserialize)] struct DeleteBatchQuery { #[serde(rename = "partitionKey")] partition_key: String, #[serde(default)] prefix: Option, #[serde(default)] start: Option, #[serde(default)] end: Option, #[serde(default, rename = "singleItem")] single_item: bool, } #[derive(Serialize)] struct DeleteBatchResponse { #[serde(rename = "partitionKey")] partition_key: String, prefix: Option, start: Option, end: Option, #[serde(rename = "singleItem")] single_item: bool, #[serde(rename = "deletedItems")] deleted_items: usize, } #[derive(Deserialize)] struct PollRangeQuery { #[serde(default)] prefix: Option, #[serde(default)] start: Option, #[serde(default)] end: Option, #[serde(default)] timeout: Option, #[serde(default, rename = "seenMarker")] seen_marker: Option, } #[derive(Serialize)] struct PollRangeResponse { items: Vec, #[serde(rename = "seenMarker")] seen_marker: String, }