aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/k2v')
-rw-r--r--src/api/k2v/api_server.rs33
-rw-r--r--src/api/k2v/batch.rs19
-rw-r--r--src/api/k2v/error.rs134
-rw-r--r--src/api/k2v/index.rs2
-rw-r--r--src/api/k2v/item.rs2
-rw-r--r--src/api/k2v/mod.rs1
-rw-r--r--src/api/k2v/range.rs4
-rw-r--r--src/api/k2v/router.rs6
8 files changed, 166 insertions, 35 deletions
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 5f5e9030..eb0fbdd7 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -7,13 +7,12 @@ use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
-use garage_table::util::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
-use crate::error::*;
use crate::generic_server::*;
+use crate::k2v::error::*;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
@@ -60,6 +59,7 @@ impl ApiHandler for K2VApiServer {
const API_NAME_DISPLAY: &'static str = "K2V";
type Endpoint = K2VApiEndpoint;
+ type Error = Error;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
@@ -83,13 +83,14 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
- return handle_options_s3api(garage, &req, Some(bucket_name)).await;
+ return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
+ .await
+ .ok_or_bad_request("Error handling OPTIONS")?);
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
- let api_key = api_key.ok_or_else(|| {
- Error::Forbidden("Garage does not support anonymous access yet".to_string())
- })?;
+ let api_key = api_key
+ .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let req = parse_streaming_body(
&api_key,
@@ -99,13 +100,14 @@ impl ApiHandler for K2VApiServer {
"k2v",
)?;
- let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+ let bucket_id = garage
+ .bucket_helper()
+ .resolve_bucket(&bucket_name, &api_key)
+ .await?;
let bucket = garage
- .bucket_table
- .get(&EmptyKey, &bucket_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or(Error::NoSuchBucket)?;
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
@@ -115,9 +117,7 @@ impl ApiHandler for K2VApiServer {
};
if !allowed {
- return Err(Error::Forbidden(
- "Operation is not allowed for this key.".to_string(),
- ));
+ return Err(Error::forbidden("Operation is not allowed for this key."));
}
// Look up what CORS rule might apply to response.
@@ -125,7 +125,8 @@ impl ApiHandler for K2VApiServer {
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
- Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
+ Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)
+ .ok_or_internal_error("Error looking up CORS rule")?,
_ => None,
};
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 4ecddeb9..db9901cf 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -12,7 +12,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::error::*;
+use crate::helpers::*;
+use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_insert_batch(
@@ -20,9 +21,7 @@ pub async fn handle_insert_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let items: Vec<InsertBatchItem> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
let mut items2 = vec![];
for it in items {
@@ -52,9 +51,7 @@ pub async fn handle_read_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<ReadBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -91,7 +88,7 @@ async fn handle_read_batch_query(
let (items, more, next_start) = if query.single_item {
if query.prefix.is_some() || query.end.is_some() || query.limit.is_some() || query.reverse {
- return Err(Error::BadRequest("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true.".into()));
+ return Err(Error::bad_request("Batch query parameters 'prefix', 'end', 'limit' and 'reverse' must not be set when singleItem is true."));
}
let sk = query
.start
@@ -149,9 +146,7 @@ pub async fn handle_delete_batch(
bucket_id: Uuid,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
- let body = hyper::body::to_bytes(req.into_body()).await?;
- let queries: Vec<DeleteBatchQuery> =
- serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
+ let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
let resp_results = futures::future::join_all(
queries
@@ -188,7 +183,7 @@ async fn handle_delete_batch_query(
let deleted_items = if query.single_item {
if query.prefix.is_some() || query.end.is_some() {
- return Err(Error::BadRequest("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true.".into()));
+ return Err(Error::bad_request("Batch query parameters 'prefix' and 'end' must not be set when singleItem is true."));
}
let sk = query
.start
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
new file mode 100644
index 00000000..4c55d8b5
--- /dev/null
+++ b/src/api/k2v/error.rs
@@ -0,0 +1,134 @@
+use err_derive::Error;
+use hyper::header::HeaderValue;
+use hyper::{Body, HeaderMap, StatusCode};
+
+use garage_model::helper::error::Error as HelperError;
+
+use crate::common_error::CommonError;
+pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
+use crate::generic_server::ApiError;
+use crate::helpers::CustomApiErrorBody;
+use crate::signature::error::Error as SignatureError;
+
+/// Errors of this crate
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "{}", _0)]
+ /// Error from common error
+ Common(CommonError),
+
+ // Category: cannot process
+ /// Authorization Header Malformed
+ #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ AuthorizationHeaderMalformed(String),
+
+ /// The object requested don't exists
+ #[error(display = "Key not found")]
+ NoSuchKey,
+
+ /// Some base64 encoded data was badly encoded
+ #[error(display = "Invalid base64: {}", _0)]
+ InvalidBase64(#[error(source)] base64::DecodeError),
+
+ /// The client sent a header with invalid value
+ #[error(display = "Invalid header value: {}", _0)]
+ InvalidHeader(#[error(source)] hyper::header::ToStrError),
+
+ /// The client asked for an invalid return format (invalid Accept header)
+ #[error(display = "Not acceptable: {}", _0)]
+ NotAcceptable(String),
+
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
+ #[error(display = "Invalid UTF-8: {}", _0)]
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
+}
+
+impl<T> From<T> for Error
+where
+ CommonError: From<T>,
+{
+ fn from(err: T) -> Self {
+ Error::Common(CommonError::from(err))
+ }
+}
+
+impl CommonErrorDerivative for Error {}
+
+impl From<HelperError> for Error {
+ fn from(err: HelperError) -> Self {
+ match err {
+ HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
+ HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
+ HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
+ HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
+ e => Self::Common(CommonError::BadRequest(format!("{}", e))),
+ }
+ }
+}
+
+impl From<SignatureError> for Error {
+ fn from(err: SignatureError) -> Self {
+ match err {
+ SignatureError::Common(c) => Self::Common(c),
+ SignatureError::AuthorizationHeaderMalformed(c) => {
+ Self::AuthorizationHeaderMalformed(c)
+ }
+ SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
+ SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
+ }
+ }
+}
+
+impl Error {
+ /// This returns a keyword for the corresponding error.
+ /// Here, these keywords are not necessarily those from AWS S3,
+ /// as we are building a custom API
+ fn code(&self) -> &'static str {
+ match self {
+ Error::Common(c) => c.aws_code(),
+ Error::NoSuchKey => "NoSuchKey",
+ Error::NotAcceptable(_) => "NotAcceptable",
+ Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
+ Error::InvalidBase64(_) => "InvalidBase64",
+ Error::InvalidHeader(_) => "InvalidHeaderValue",
+ Error::InvalidUtf8Str(_) => "InvalidUtf8String",
+ }
+ }
+}
+
+impl ApiError for Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
+ fn http_status_code(&self) -> StatusCode {
+ match self {
+ Error::Common(c) => c.http_status_code(),
+ Error::NoSuchKey => StatusCode::NOT_FOUND,
+ Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
+ Error::AuthorizationHeaderMalformed(_)
+ | Error::InvalidBase64(_)
+ | Error::InvalidHeader(_)
+ | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
+ }
+ }
+
+ fn add_http_headers(&self, _header_map: &mut HeaderMap<HeaderValue>) {
+ // nothing
+ }
+
+ fn http_body(&self, garage_region: &str, path: &str) -> Body {
+ let error = CustomApiErrorBody {
+ code: self.code().to_string(),
+ message: format!("{}", self),
+ path: path.to_string(),
+ region: garage_region.to_string(),
+ };
+ Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
+ r#"
+{
+ "code": "InternalError",
+ "message": "JSON encoding of error failed"
+}
+ "#
+ .into()
+ }))
+ }
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 896dbcf0..d5db906d 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -12,7 +12,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
-use crate::error::*;
+use crate::k2v::error::*;
use crate::k2v::range::read_range;
pub async fn handle_read_index(
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 1860863e..836d386f 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -10,7 +10,7 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::error::*;
+use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs
index ee210ad5..b6a8c5cf 100644
--- a/src/api/k2v/mod.rs
+++ b/src/api/k2v/mod.rs
@@ -1,4 +1,5 @@
pub mod api_server;
+mod error;
mod router;
mod batch;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
index 295c34aa..bb9d3be5 100644
--- a/src/api/k2v/range.rs
+++ b/src/api/k2v/range.rs
@@ -7,8 +7,8 @@ use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::error::*;
use crate::helpers::key_after_prefix;
+use crate::k2v::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
@@ -31,7 +31,7 @@ where
(None, Some(s)) => (Some(s.clone()), false),
(Some(p), Some(s)) => {
if !s.starts_with(p) {
- return Err(Error::BadRequest(format!(
+ return Err(Error::bad_request(format!(
"Start key '{}' does not start with prefix '{}'",
s, p
)));
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index f948ffce..50e6965b 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -1,4 +1,4 @@
-use crate::error::*;
+use crate::k2v::error::*;
use std::borrow::Cow;
@@ -62,7 +62,7 @@ impl Endpoint {
.unwrap_or((path.to_owned(), ""));
if bucket.is_empty() {
- return Err(Error::BadRequest("Missing bucket name".to_owned()));
+ return Err(Error::bad_request("Missing bucket name"));
}
if *req.method() == Method::OPTIONS {
@@ -83,7 +83,7 @@ impl Endpoint {
Method::PUT => Self::from_put(partition_key, &mut query)?,
Method::DELETE => Self::from_delete(partition_key, &mut query)?,
_ if req.method() == method_search => Self::from_search(partition_key, &mut query)?,
- _ => return Err(Error::BadRequest("Unknown method".to_owned())),
+ _ => return Err(Error::bad_request("Unknown method")),
};
if let Some(message) = query.nonempty_message() {