aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/item.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v/item.rs')
-rw-r--r--src/api/k2v/item.rs47
1 files changed, 26 insertions, 21 deletions
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index e13a0f30..0c5931a1 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use http::header;
-use hyper::{Body, Request, Response, StatusCode};
+use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@@ -11,6 +11,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
+use crate::helpers::*;
+use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
@@ -22,7 +24,7 @@ pub enum ReturnFormat {
}
impl ReturnFormat {
- pub fn from(req: &Request<Body>) -> Result<Self, Error> {
+ pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> {
let accept = match req.headers().get(header::ACCEPT) {
Some(a) => a.to_str()?,
None => return Ok(Self::Json),
@@ -40,7 +42,7 @@ impl ReturnFormat {
}
}
- pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
+ pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> {
let vals = item.values();
if vals.is_empty() {
@@ -52,7 +54,7 @@ impl ReturnFormat {
Self::Binary if vals.len() > 1 => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.status(StatusCode::CONFLICT)
- .body(Body::empty())?),
+ .body(empty_body())?),
Self::Binary => {
assert!(vals.len() == 1);
Self::make_binary_response(ct, vals[0])
@@ -62,22 +64,22 @@ impl ReturnFormat {
}
}
- fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
+ fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> {
match v {
DvvsValue::Deleted => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?),
+ .body(empty_body())?),
DvvsValue::Value(v) => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::OK)
- .body(Body::from(v.to_vec()))?),
+ .body(bytes_body(v.to_vec().into()))?),
}
}
- fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
+ fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> {
let items = v
.iter()
.map(|v| match v {
@@ -91,7 +93,7 @@ impl ReturnFormat {
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/json")
.status(StatusCode::OK)
- .body(Body::from(json_body))?)
+ .body(string_body(json_body))?)
}
}
@@ -99,11 +101,11 @@ impl ReturnFormat {
#[allow(clippy::ptr_arg)]
pub async fn handle_read_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &String,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let item = garage
@@ -124,11 +126,11 @@ pub async fn handle_read_item(
pub async fn handle_insert_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -137,7 +139,10 @@ pub async fn handle_insert_item(
.map(CausalContext::parse_helper)
.transpose()?;
- let body = hyper::body::to_bytes(req.into_body()).await?;
+ let body = http_body_util::BodyExt::collect(req.into_body())
+ .await?
+ .to_bytes();
+
let value = DvvsValue::Value(body.to_vec());
garage
@@ -154,16 +159,16 @@ pub async fn handle_insert_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
pub async fn handle_delete_item(
garage: Arc<Garage>,
- req: Request<Body>,
+ req: Request<ReqBody>,
bucket_id: Uuid,
partition_key: &str,
sort_key: &str,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let causal_context = req
.headers()
.get(X_GARAGE_CAUSALITY_TOKEN)
@@ -188,20 +193,20 @@ pub async fn handle_delete_item(
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
/// Handle ReadItem request
#[allow(clippy::ptr_arg)]
pub async fn handle_poll_item(
garage: Arc<Garage>,
- req: &Request<Body>,
+ req: &Request<ReqBody>,
bucket_id: Uuid,
partition_key: String,
sort_key: String,
causality_token: String,
timeout_secs: Option<u64>,
-) -> Result<Response<Body>, Error> {
+) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?;
let causal_context =
@@ -226,6 +231,6 @@ pub async fn handle_poll_item(
} else {
Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED)
- .body(Body::empty())?)
+ .body(empty_body())?)
}
}