use std::sync::Arc; use base64::prelude::*; use http::header; use hyper::{Body, Request, Response, StatusCode}; use garage_util::data::*; use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; use crate::k2v::error::*; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; pub enum ReturnFormat { Json, Binary, Either, } impl ReturnFormat { pub fn from(req: &Request<Body>) -> Result<Self, Error> { let accept = match req.headers().get(header::ACCEPT) { Some(a) => a.to_str()?, None => return Ok(Self::Json), }; let accept = accept.split(',').map(|s| s.trim()).collect::<Vec<_>>(); let accept_json = accept.contains(&"application/json") || accept.contains(&"*/*"); let accept_binary = accept.contains(&"application/octet-stream") || accept.contains(&"*/*"); match (accept_json, accept_binary) { (true, true) => Ok(Self::Either), (true, false) => Ok(Self::Json), (false, true) => Ok(Self::Binary), (false, false) => Err(Error::NotAcceptable("Invalid Accept: header value, must contain either application/json or application/octet-stream (or both)".into())), } } pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { let vals = item.values(); if vals.is_empty() { return Err(Error::NoSuchKey); } let ct = item.causal_context().serialize(); match self { Self::Binary if vals.len() > 1 => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .status(StatusCode::CONFLICT) .body(Body::empty())?), Self::Binary => { assert!(vals.len() == 1); Self::make_binary_response(ct, vals[0]) } Self::Either if vals.len() == 1 => Self::make_binary_response(ct, vals[0]), _ => Self::make_json_response(ct, &vals[..]), } } fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { match v { DvvsValue::Deleted => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::NO_CONTENT) .body(Body::empty())?), DvvsValue::Value(v) => Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/octet-stream") .status(StatusCode::OK) .body(Body::from(v.to_vec()))?), } } fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { let items = v .iter() .map(|v| match v { DvvsValue::Deleted => serde_json::Value::Null, DvvsValue::Value(v) => serde_json::Value::String(BASE64_STANDARD.encode(v)), }) .collect::<Vec<_>>(); let json_body = serde_json::to_string_pretty(&items).ok_or_internal_error("JSON encoding error")?; Ok(Response::builder() .header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(header::CONTENT_TYPE, "application/json") .status(StatusCode::OK) .body(Body::from(json_body))?) } } /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_read_item( garage: Arc<Garage>, req: &Request<Body>, bucket_id: Uuid, partition_key: &str, sort_key: &String, ) -> Result<Response<Body>, Error> { let format = ReturnFormat::from(req)?; let item = garage .k2v .item_table .get( &K2VItemPartition { bucket_id, partition_key: partition_key.to_string(), }, sort_key, ) .await? .ok_or(Error::NoSuchKey)?; format.make_response(&item) } pub async fn handle_insert_item( garage: Arc<Garage>, req: Request<Body>, bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result<Response<Body>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? .map(CausalContext::parse_helper) .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); garage .k2v .rpc .insert( bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, value, ) .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty())?) } pub async fn handle_delete_item( garage: Arc<Garage>, req: Request<Body>, bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result<Response<Body>, Error> { let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? .map(CausalContext::parse_helper) .transpose()?; let value = DvvsValue::Deleted; garage .k2v .rpc .insert( bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, value, ) .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty())?) } /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( garage: Arc<Garage>, req: &Request<Body>, bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option<u64>, ) -> Result<Response<Body>, Error> { let format = ReturnFormat::from(req)?; let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000; let item = garage .k2v .rpc .poll_item( bucket_id, partition_key, sort_key, causal_context, timeout_msec, ) .await?; if let Some(item) = item { format.make_response(&item) } else { Ok(Response::builder() .status(StatusCode::NOT_MODIFIED) .body(Body::empty())?) } }