From 0650a43cf14e7e52121a553130a9ea6c92b7bd4a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 16:48:51 +0100 Subject: PutObject: better cleanup on Drop (incl. when request is interrupted in the middle) --- src/api/s3/put.rs | 93 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 59 insertions(+), 34 deletions(-) (limited to 'src/api') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..c08fe40a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -119,6 +119,17 @@ pub(crate) async fn save_stream> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +150,27 @@ pub(crate) async fn save_stream> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +182,10 @@ pub(crate) async fn save_stream> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { .unwrap() } +struct InterruptedCleanup(Option<(Arc, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc, req: &Request, -- cgit v1.2.3 From a48e2e0cb2bdc75e14dfde199dbca0a779b1316b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 10:30:59 +0100 Subject: K2V: Subscription to ranges of items --- src/api/k2v/item.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..9b78bc07 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -211,7 +211,7 @@ pub async fn handle_poll_item( let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, -- cgit v1.2.3 From b83517d521b1bea7585ce45a803fad373f28225c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 10 Jan 2023 15:22:25 +0100 Subject: Implement PollRange API endpoint --- src/api/k2v/api_server.rs | 3 ++ src/api/k2v/batch.rs | 79 +++++++++++++++++++++++++++++++++++++++++------ src/api/k2v/index.rs | 9 ++---- src/api/k2v/item.rs | 4 ++- src/api/k2v/router.rs | 8 ++++- 5 files changed, 86 insertions(+), 17 deletions(-) (limited to 'src/api') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 084867b5..bb85b2e7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer { Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::PollRange { partition_key } => { + handle_poll_range(garage, bucket_id, &partition_key, req).await + } Endpoint::Options => unreachable!(), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..be3fba07 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -4,7 +4,6 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_table::{EnumerationOrder, TableSchema}; @@ -65,10 +64,7 @@ pub async fn handle_read_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( @@ -160,10 +156,7 @@ pub async fn handle_delete_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( @@ -257,6 +250,53 @@ async fn handle_delete_batch_query( }) } +pub(crate) async fn handle_poll_range( + garage: Arc, + bucket_id: Uuid, + partition_key: &str, + req: Request, +) -> Result, Error> { + use garage_model::k2v::sub::PollRange; + + let query = parse_json_body::(req).await?; + + let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + + let resp = garage + .k2v + .rpc + .poll_range( + PollRange { + partition: K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + start: query.start, + end: query.end, + prefix: query.prefix, + }, + query.seen_marker, + timeout_msec, + ) + .await?; + + if let Some((items, seen_marker)) = resp { + let resp = PollRangeResponse { + items: items + .into_iter() + .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .collect::>(), + seen_marker, + }; + + Ok(json_ok_response(&resp)?) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -361,3 +401,24 @@ struct DeleteBatchResponse { #[serde(rename = "deletedItems")] deleted_items: usize, } + +#[derive(Deserialize)] +struct PollRangeQuery { + #[serde(default)] + prefix: Option, + #[serde(default)] + start: Option, + #[serde(default)] + end: Option, + #[serde(default)] + timeout: Option, + #[serde(default, rename = "seenMarker")] + seen_marker: Option, +} + +#[derive(Serialize)] +struct PollRangeResponse { + items: Vec, + #[serde(rename = "seenMarker")] + seen_marker: String, +} diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 210950bf..6c1d4a91 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use hyper::{Body, Response, StatusCode}; +use hyper::{Body, Response}; use serde::Serialize; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_rpc::ring::Ring; use garage_table::util::*; @@ -12,6 +11,7 @@ use garage_table::util::*; use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use crate::helpers::*; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -68,10 +68,7 @@ pub async fn handle_read_index( next_start, }; - let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resp)?) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 9b78bc07..ebf34723 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -208,6 +208,8 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let item = garage .k2v .rpc @@ -216,7 +218,7 @@ pub async fn handle_poll_item( partition_key, sort_key, causal_context, - timeout_secs.unwrap_or(300) * 1000, + timeout_msec, ) .await?; diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index e7a3dd69..1cc58be5 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -32,6 +32,9 @@ pub enum Endpoint { causality_token: String, timeout: Option, }, + PollRange { + partition_key: String, + }, ReadBatch { }, ReadIndex { @@ -113,6 +116,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => ReadBatch, @@ -142,6 +146,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => InsertBatch, @@ -234,7 +239,8 @@ impl Endpoint { generateQueryParameters! { keywords: [ "delete" => DELETE, - "search" => SEARCH + "search" => SEARCH, + "poll_range" => POLL_RANGE ], fields: [ "prefix" => prefix, -- cgit v1.2.3 From bba13f40fc2e411347ea83960935b39cedb0a7c4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 12:27:19 +0100 Subject: Correctly return bad requests when seeh marker is invalid --- src/api/k2v/batch.rs | 6 +----- src/api/k2v/item.rs | 10 ++++------ 2 files changed, 5 insertions(+), 11 deletions(-) (limited to 'src/api') diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index be3fba07..844faf89 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -24,11 +24,7 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it - .ct - .map(|s| CausalContext::parse(&s)) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { Some(vs) => { DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index ebf34723..e7385bcc 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -133,9 +133,8 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -169,9 +168,8 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let value = DvvsValue::Deleted; -- cgit v1.2.3 From cbfae673e83251e988d764d3a29f06f571ba8452 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 11 Jan 2023 15:03:08 +0100 Subject: PollRange & PollItem: min timeout = 1 sec --- src/api/k2v/batch.rs | 2 +- src/api/k2v/item.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src/api') diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 844faf89..abc9403c 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -256,7 +256,7 @@ pub(crate) async fn handle_poll_range( let query = parse_json_body::(req).await?; - let timeout_msec = query.timeout.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; let resp = garage .k2v diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index e7385bcc..787a7df3 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -206,7 +206,7 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; - let timeout_msec = timeout_secs.unwrap_or(300).clamp(10, 600) * 1000; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000; let item = garage .k2v -- cgit v1.2.3 From 100b01e85916d57ebff2eb63c915e2c4d2801b47 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 12 Jan 2023 13:35:14 +0000 Subject: Implemented website hosting authorization endpoint. Fixes: #468 --- src/api/admin/api_server.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++ src/api/admin/router.rs | 3 +++ 2 files changed, 51 insertions(+) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 2d325fb1..7a534f32 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -77,6 +77,53 @@ impl AdminApiServer { .body(Body::empty())?) } + async fn handle_check_website_enabled( + &self, + req: Request, + ) -> Result, Error> { + let has_domain_header = req.headers().contains_key("domain"); + + if !has_domain_header { + return Err(Error::bad_request("No domain header found")); + } + + let domain = &req + .headers() + .get("domain") + .ok_or_internal_error("Could not parse domain header")?; + + let domain_string = String::from( + domain + .to_str() + .ok_or_bad_request("Invalid characters found in domain header")?, + ); + + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&domain_string) + .await? + .ok_or_else(|| HelperError::NoSuchBucket(domain_string))?; + + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let bucket_state = bucket.state.as_option().unwrap(); + let bucket_website_config = bucket_state.website_config.get(); + + match bucket_website_config { + Some(_v) => Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from("Bucket authorized for website hosting"))?), + None => Err(Error::bad_request( + "Bucket is not authorized for website hosting", + )), + } + } + fn handle_health(&self) -> Result, Error> { let health = self.garage.system.health(); @@ -174,6 +221,7 @@ impl ApiHandler for AdminApiServer { match endpoint { Endpoint::Options => self.handle_options(&req), + Endpoint::CheckWebsiteEnabled => self.handle_check_website_enabled(req).await, Endpoint::Health => self.handle_health(), Endpoint::Metrics => self.handle_metrics(), Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs index 62e6abc3..0dcb1546 100644 --- a/src/api/admin/router.rs +++ b/src/api/admin/router.rs @@ -17,6 +17,7 @@ router_match! {@func #[derive(Debug, Clone, PartialEq, Eq)] pub enum Endpoint { Options, + CheckWebsiteEnabled, Health, Metrics, GetClusterStatus, @@ -91,6 +92,7 @@ impl Endpoint { let res = router_match!(@gen_path_parser (req.method(), path, query) [ OPTIONS _ => Options, + GET "/check" => CheckWebsiteEnabled, GET "/health" => Health, GET "/metrics" => Metrics, GET "/v0/status" => GetClusterStatus, @@ -136,6 +138,7 @@ impl Endpoint { pub fn authorization_type(&self) -> Authorization { match self { Self::Health => Authorization::None, + Self::CheckWebsiteEnabled => Authorization::None, Self::Metrics => Authorization::MetricsToken, _ => Authorization::AdminToken, } -- cgit v1.2.3 From dead123892f7f0b0840f3891718bf12e54d51384 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 18:39:35 +0000 Subject: api/Cargo.toml: Updated pin-project to 1.0.12. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index dba0bbef..cb86a679 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -36,7 +36,7 @@ sha2 = "0.10" futures = "0.3" futures-util = "0.3" -pin-project = "1.0.11" +pin-project = "1.0.12" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio-stream = "0.1" -- cgit v1.2.3 From 1c435fce099ad41b395b11a05bbd17609e54249b Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 18:40:11 +0000 Subject: api/Cargo.toml: Updated httpdate from 0.3 to 1.0. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index cb86a679..40a446c5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -42,7 +42,7 @@ tokio-stream = "0.1" form_urlencoded = "1.0.0" http = "0.2" -httpdate = "0.3" +httpdate = "1.0" http-range = "0.1" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } multer = "2.0" -- cgit v1.2.3 From 93c3f8fc8c9d849c26c2eccd551ddf1682e9494f Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 18:41:10 +0000 Subject: api/Cargo.toml: Updated url from 2.1 to 2.3. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 40a446c5..c2fe6da8 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -52,7 +52,7 @@ serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" serde_json = "1.0" quick-xml = { version = "0.21", features = [ "serialize" ] } -url = "2.1" +url = "2.3" opentelemetry = "0.17" opentelemetry-prometheus = { version = "0.10", optional = true } -- cgit v1.2.3 From 36944f1839b27d0c60feadbe15e1d91ad9b74538 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 19:14:07 +0000 Subject: Cargo.toml: Updated base64 from 0.13 to 0.21. --- src/api/Cargo.toml | 2 +- src/api/k2v/batch.rs | 11 +++++++---- src/api/k2v/item.rs | 3 ++- src/api/s3/list.rs | 11 +++++++---- src/api/s3/post_object.rs | 5 ++++- src/api/s3/put.rs | 3 ++- 6 files changed, 23 insertions(+), 12 deletions(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index c2fe6da8..cf6dc909 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -21,7 +21,7 @@ garage_util = { version = "0.8.1", path = "../util" } garage_rpc = { version = "0.8.1", path = "../rpc" } async-trait = "0.1.7" -base64 = "0.13" +base64 = "0.21" bytes = "1.0" chrono = "0.4" crypto-common = "0.1" diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..82b4f7e3 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use base64::prelude::*; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; @@ -31,9 +32,11 @@ pub async fn handle_insert_batch( .transpose() .ok_or_bad_request("Invalid causality token")?; let v = match it.v { - Some(vs) => { - DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) - } + Some(vs) => DvvsValue::Value( + BASE64_STANDARD + .decode(vs) + .ok_or_bad_request("Invalid base64 value")?, + ), None => DvvsValue::Deleted, }; items2.push((it.pk, it.sk, ct, v)); @@ -322,7 +325,7 @@ impl ReadBatchResponseItem { .values() .iter() .map(|v| match v { - DvvsValue::Value(x) => Some(base64::encode(x)), + DvvsValue::Value(x) => Some(BASE64_STANDARD.encode(x)), DvvsValue::Deleted => None, }) .collect::>(); diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..041382c0 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use base64::prelude::*; use http::header; use hyper::{Body, Request, Response, StatusCode}; @@ -81,7 +82,7 @@ impl ReturnFormat { .iter() .map(|v| match v { DvvsValue::Deleted => serde_json::Value::Null, - DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)), + DvvsValue::Value(v) => serde_json::Value::String(BASE64_STANDARD.encode(v)), }) .collect::>(); let json_body = diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e5f486c8..5cb0d65a 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; +use base64::prelude::*; use hyper::{Body, Response}; use garage_util::data::*; @@ -129,11 +130,11 @@ pub async fn handle_list( next_continuation_token: match (query.is_v2, &pagination) { (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( "]{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( "[{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), _ => None, }, @@ -583,14 +584,16 @@ impl ListObjectsQuery { (Some(token), _) => match &token[..1] { "[" => Ok(RangeBegin::IncludingKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, fallback_key: None, }), "]" => Ok(RangeBegin::AfterKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, }), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index d063faa4..da542526 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -4,6 +4,7 @@ use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; +use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; use futures::{Stream, StreamExt}; @@ -138,7 +139,9 @@ pub async fn handle_post_object( .get_existing_bucket(bucket_id) .await?; - let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; + let decoded_policy = BASE64_STANDARD + .decode(&policy) + .ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c08fe40a..350ab884 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; @@ -207,7 +208,7 @@ fn ensure_checksum_matches( } } if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); -- cgit v1.2.3 From 63e22e71f27d5ad26ffe48361abfc96090c97c28 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 22:10:25 +0000 Subject: api/Cargo.toml: Updated idna from 0.2 to 0.3. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index cf6dc909..2ea7e4cf 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -28,7 +28,7 @@ crypto-common = "0.1" err-derive = "0.3" hex = "0.4" hmac = "0.12" -idna = "0.2" +idna = "0.3" tracing = "0.1.30" md-5 = "0.10" nom = "7.1" -- cgit v1.2.3 From 20c1cdf662a0b5368a8020526f43e08baedfedaa Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Mon, 23 Jan 2023 22:27:44 +0000 Subject: Cargo.toml: Loosen tracing dependency to just 0.1. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 2ea7e4cf..24c48604 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -29,7 +29,7 @@ err-derive = "0.3" hex = "0.4" hmac = "0.12" idna = "0.3" -tracing = "0.1.30" +tracing = "0.1" md-5 = "0.10" nom = "7.1" sha2 = "0.10" -- cgit v1.2.3 From 8e93d6997415d60ba5c371da8b27065a57254a8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 26 Jan 2023 17:26:32 +0100 Subject: More clippy fixes --- src/api/admin/api_server.rs | 2 +- src/api/admin/bucket.rs | 4 ++-- src/api/s3/bucket.rs | 2 +- src/api/s3/post_object.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 7a534f32..a115d732 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -103,7 +103,7 @@ impl AdminApiServer { .bucket_helper() .resolve_global_bucket_name(&domain_string) .await? - .ok_or_else(|| HelperError::NoSuchBucket(domain_string))?; + .ok_or(HelperError::NoSuchBucket(domain_string))?; let bucket = self .garage diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 65034852..e60f07ca 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -167,7 +167,7 @@ async fn bucket_info_results( let quotas = state.quotas.get(); let res = GetBucketInfoResult { - id: hex::encode(&bucket.id), + id: hex::encode(bucket.id), global_aliases: state .aliases .items() @@ -575,6 +575,6 @@ pub async fn handle_local_unalias_bucket( // ---- HELPER ---- fn parse_bucket_id(id: &str) -> Result { - let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?; + let id_hex = hex::decode(id).ok_or_bad_request("Invalid bucket id")?; Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?) } diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 8471385f..733981e1 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option> { let mut ret = None; for item in cbc.children() { if item.has_tag_name("LocationConstraint") { - if ret != None { + if ret.is_some() { return None; } ret = Some(item.text()?.to_string()); diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index da542526..f2098ab0 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -140,7 +140,7 @@ pub async fn handle_post_object( .await?; let decoded_policy = BASE64_STANDARD - .decode(&policy) + .decode(policy) .ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; -- cgit v1.2.3 From 004bb5b4f1b2086914376265425fd46df5059db3 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sun, 29 Jan 2023 01:16:04 +0000 Subject: api_server.rs: Adapted to use query string per Caddy upstream change. --- src/api/admin/api_server.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index a115d732..dae42059 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -81,29 +82,32 @@ impl AdminApiServer { &self, req: Request, ) -> Result, Error> { - let has_domain_header = req.headers().contains_key("domain"); - - if !has_domain_header { - return Err(Error::bad_request("No domain header found")); + let query_params: HashMap = req + .uri() + .query() + .map(|v| { + url::form_urlencoded::parse(v.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_else(HashMap::new); + + let has_domain_key = query_params.contains_key("domain"); + + if !has_domain_key { + return Err(Error::bad_request("No domain query string found")); } - let domain = &req - .headers() + let domain = query_params .get("domain") - .ok_or_internal_error("Could not parse domain header")?; - - let domain_string = String::from( - domain - .to_str() - .ok_or_bad_request("Invalid characters found in domain header")?, - ); + .ok_or_internal_error("Could not parse domain query string")?; let bucket_id = self .garage .bucket_helper() - .resolve_global_bucket_name(&domain_string) + .resolve_global_bucket_name(&domain) .await? - .ok_or(HelperError::NoSuchBucket(domain_string))?; + .ok_or(HelperError::NoSuchBucket(domain.to_string()))?; let bucket = self .garage -- cgit v1.2.3 From 9c354f0a8ff258872aa3a4b7c116e1d66815afd1 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sun, 29 Jan 2023 20:27:15 +0000 Subject: Improved bucket authorization response strings. --- src/api/admin/api_server.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'src/api') diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index dae42059..58dd38d8 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -119,12 +119,16 @@ impl AdminApiServer { let bucket_website_config = bucket_state.website_config.get(); match bucket_website_config { - Some(_v) => Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from("Bucket authorized for website hosting"))?), - None => Err(Error::bad_request( - "Bucket is not authorized for website hosting", - )), + Some(_v) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(format!( + "Bucket '{domain}' is authorized for website hosting" + )))?) + } + None => Err(Error::bad_request(format!( + "Bucket '{domain}' is not authorized for website hosting" + ))), } } -- cgit v1.2.3 From 1b6ec74748f1182fbfb9b4ce934351b000ccab22 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 2 Feb 2023 16:16:12 +0000 Subject: error.rs: Corrected error messages to say unexpected scope. --- src/api/k2v/error.rs | 2 +- src/api/s3/error.rs | 2 +- src/api/signature/error.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/api') diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 42491466..4eb017ab 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -19,7 +19,7 @@ pub enum Error { // Category: cannot process /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), /// The object requested don't exists diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 67009d63..c50cff9f 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -21,7 +21,7 @@ pub enum Error { // Category: cannot process /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), /// The object requested don't exists diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs index f5a067bd..f0d7c816 100644 --- a/src/api/signature/error.rs +++ b/src/api/signature/error.rs @@ -11,7 +11,7 @@ pub enum Error { Common(CommonError), /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), // Category: bad request -- cgit v1.2.3 From 94d70bec6962046377519521bd53dfcac37e3c3b Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sat, 4 Feb 2023 09:49:56 +0000 Subject: generic_server.rs: Added support for logging X-Forwarded-For header. Fixes: #460 --- src/api/generic_server.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 62fe4e5a..aa90868a 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -125,7 +125,26 @@ impl ApiServer { addr: SocketAddr, ) -> Result, GarageError> { let uri = req.uri().clone(); - info!("{} {} {}", addr, req.method(), uri); + + let has_forwarded_for_header = req.headers().contains_key("x-forwarded-for"); + if has_forwarded_for_header { + let forwarded_for_ip_addr = &req + .headers() + .get("x-forwarded-for") + .expect("Could not parse X-Forwarded-For header") + .to_str() + .unwrap_or_default(); + + info!( + "{} (via {}) {} {}", + forwarded_for_ip_addr, + addr, + req.method(), + uri + ); + } else { + info!("{} {} {}", addr, req.method(), uri); + } debug!("{:?}", req); let tracer = opentelemetry::global::tracer("garage"); -- cgit v1.2.3 From e4e5196066a75ed9327a62d8082decb1e0c381e7 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Thu, 9 Feb 2023 13:49:43 +0000 Subject: api/generic_server.rs: Use new handle_forwarded_for_headers() function. --- src/api/generic_server.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'src/api') diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index aa90868a..d0354d28 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -19,6 +19,7 @@ use opentelemetry::{ }; use garage_util::error::Error as GarageError; +use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; pub(crate) trait ApiEndpoint: Send + Sync + 'static { @@ -126,15 +127,9 @@ impl ApiServer { ) -> Result, GarageError> { let uri = req.uri().clone(); - let has_forwarded_for_header = req.headers().contains_key("x-forwarded-for"); - if has_forwarded_for_header { - let forwarded_for_ip_addr = &req - .headers() - .get("x-forwarded-for") - .expect("Could not parse X-Forwarded-For header") - .to_str() - .unwrap_or_default(); - + if let Ok(forwarded_for_ip_addr) = + forwarded_headers::handle_forwarded_for_headers(&req.headers()) + { info!( "{} (via {}) {} {}", forwarded_for_ip_addr, -- cgit v1.2.3 From 9ea154ae9c6b12175192419679ec1bc40da827d1 Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Fri, 10 Mar 2023 14:45:18 +0000 Subject: admin/cluster.rs: Added rust_version. --- src/api/admin/cluster.rs | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/api') diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 182a4f6f..98bf2b5a 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -20,6 +20,7 @@ pub async fn handle_get_cluster_status(garage: &Arc) -> Result, + rust_version: &'static str, db_engine: String, known_nodes: HashMap, layout: GetClusterLayoutResponse, -- cgit v1.2.3 From 0a1ddcf6301359cde654003b46636497f6a417a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 13 Mar 2023 18:44:09 +0100 Subject: Prepare for v0.8.2 --- src/api/Cargo.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 24c48604..40793fd5 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,11 +14,11 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.8.1", path = "../model" } -garage_table = { version = "0.8.1", path = "../table" } -garage_block = { version = "0.8.1", path = "../block" } -garage_util = { version = "0.8.1", path = "../util" } -garage_rpc = { version = "0.8.1", path = "../rpc" } +garage_model = { version = "0.8.2", path = "../model" } +garage_table = { version = "0.8.2", path = "../table" } +garage_block = { version = "0.8.2", path = "../block" } +garage_util = { version = "0.8.2", path = "../util" } +garage_rpc = { version = "0.8.2", path = "../rpc" } async-trait = "0.1.7" base64 = "0.21" -- cgit v1.2.3 From c168383113cff62e959b7164c83c483f32aeb59f Mon Sep 17 00:00:00 2001 From: Jonathan Davies Date: Sun, 23 Apr 2023 19:51:20 +0100 Subject: api/Cargo.toml: Bumped quick-xml to version 0.26. --- src/api/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 40793fd5..9babec02 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -51,7 +51,7 @@ roxmltree = "0.14" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" serde_json = "1.0" -quick-xml = { version = "0.21", features = [ "serialize" ] } +quick-xml = { version = "0.26", features = [ "serialize" ] } url = "2.3" opentelemetry = "0.17" -- cgit v1.2.3