From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/api/s3/post_object.rs | 507 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 507 insertions(+) create mode 100644 src/api/s3/post_object.rs (limited to 'src/api/s3/post_object.rs') diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs new file mode 100644 index 00000000..86fa7880 --- /dev/null +++ b/src/api/s3/post_object.rs @@ -0,0 +1,507 @@ +use std::collections::HashMap; +use std::convert::TryInto; +use std::ops::RangeInclusive; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use chrono::{DateTime, Duration, Utc}; +use futures::{Stream, StreamExt}; +use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; +use hyper::{Body, Request, Response, StatusCode}; +use multer::{Constraints, Multipart, SizeLimit}; +use serde::Deserialize; + +use garage_model::garage::Garage; + +use crate::error::*; +use crate::helpers::resolve_bucket; +use crate::s3::put::{get_headers, save_stream}; +use crate::s3::xml as s3_xml; +use crate::signature::payload::{parse_date, verify_v4}; + +pub async fn handle_post_object( + garage: Arc, + req: Request, + bucket: String, +) -> Result, Error> { + let boundary = req + .headers() + .get(header::CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + .and_then(|ct| multer::parse_boundary(ct).ok()) + .ok_or_bad_request("Counld not get multipart boundary")?; + + // 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable + // for a PostObject + let constraints = Constraints::new().size_limit( + SizeLimit::new() + .per_field(16 * 1024) + .for_field("file", 5 * 1024 * 1024 * 1024), + ); + + let (head, body) = req.into_parts(); + let mut multipart = Multipart::with_constraints(body, boundary, constraints); + + let mut params = HeaderMap::new(); + let field = loop { + let field = if let Some(field) = multipart.next_field().await? { + field + } else { + return Err(Error::BadRequest( + "Request did not contain a file".to_owned(), + )); + }; + let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { + name + } else { + continue; + }; + if name == "file" { + break field; + } + + if let Ok(content) = HeaderValue::from_str(&field.text().await?) { + match name.as_str() { + "tag" => (/* tag need to be reencoded, but we don't support them yet anyway */), + "acl" => { + if params.insert("x-amz-acl", content).is_some() { + return Err(Error::BadRequest( + "Field 'acl' provided more than one time".to_string(), + )); + } + } + _ => { + if params.insert(&name, content).is_some() { + return Err(Error::BadRequest(format!( + "Field '{}' provided more than one time", + name + ))); + } + } + } + } + }; + + // Current part is file. Do some checks before handling to PutObject code + let key = params + .get("key") + .ok_or_bad_request("No key was provided")? + .to_str()?; + let credential = params + .get("x-amz-credential") + .ok_or_else(|| { + Error::Forbidden("Garage does not support anonymous access yet".to_string()) + })? + .to_str()?; + let policy = params + .get("policy") + .ok_or_bad_request("No policy was provided")? + .to_str()?; + let signature = params + .get("x-amz-signature") + .ok_or_bad_request("No signature was provided")? + .to_str()?; + let date = params + .get("x-amz-date") + .ok_or_bad_request("No date was provided")? + .to_str()?; + + let key = if key.contains("${filename}") { + // if no filename is provided, don't replace. This matches the behavior of AWS. + if let Some(filename) = field.file_name() { + key.replace("${filename}", filename) + } else { + key.to_owned() + } + } else { + key.to_owned() + }; + + let date = parse_date(date)?; + let api_key = verify_v4( + &garage, + "s3", + credential, + &date, + signature, + policy.as_bytes(), + ) + .await?; + + let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?; + + if !api_key.allow_write(&bucket_id) { + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); + } + + let decoded_policy = base64::decode(&policy)?; + let decoded_policy: Policy = + serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; + + let expiration: DateTime = DateTime::parse_from_rfc3339(&decoded_policy.expiration) + .ok_or_bad_request("Invalid expiration date")? + .into(); + if Utc::now() - expiration > Duration::zero() { + return Err(Error::BadRequest( + "Expiration date is in the paste".to_string(), + )); + } + + let mut conditions = decoded_policy.into_conditions()?; + + for (param_key, value) in params.iter() { + let mut param_key = param_key.to_string(); + param_key.make_ascii_lowercase(); + match param_key.as_str() { + "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields + "content-type" => { + let conds = conditions.params.remove("content-type").ok_or_else(|| { + Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + })?; + for cond in conds { + let ok = match cond { + Operation::Equal(s) => s.as_str() == value, + Operation::StartsWith(s) => { + value.to_str()?.split(',').all(|v| v.starts_with(&s)) + } + }; + if !ok { + return Err(Error::BadRequest(format!( + "Key '{}' has value not allowed in policy", + param_key + ))); + } + } + } + "key" => { + let conds = conditions.params.remove("key").ok_or_else(|| { + Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + })?; + for cond in conds { + let ok = match cond { + Operation::Equal(s) => s == key, + Operation::StartsWith(s) => key.starts_with(&s), + }; + if !ok { + return Err(Error::BadRequest(format!( + "Key '{}' has value not allowed in policy", + param_key + ))); + } + } + } + _ => { + if param_key.starts_with("x-ignore-") { + // if a x-ignore is provided in policy, it's not removed here, so it will be + // rejected as provided in policy but not in the request. As odd as it is, it's + // how aws seems to behave. + continue; + } + let conds = conditions.params.remove(¶m_key).ok_or_else(|| { + Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + })?; + for cond in conds { + let ok = match cond { + Operation::Equal(s) => s.as_str() == value, + Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()), + }; + if !ok { + return Err(Error::BadRequest(format!( + "Key '{}' has value not allowed in policy", + param_key + ))); + } + } + } + } + } + + if let Some((param_key, _)) = conditions.params.iter().next() { + return Err(Error::BadRequest(format!( + "Key '{}' is required in policy, but no value was provided", + param_key + ))); + } + + let headers = get_headers(¶ms)?; + + let stream = field.map(|r| r.map_err(Into::into)); + let (_, md5) = save_stream( + garage, + headers, + StreamLimiter::new(stream, conditions.content_length), + bucket_id, + &key, + None, + None, + ) + .await?; + + let etag = format!("\"{}\"", md5); + + let resp = if let Some(mut target) = params + .get("success_action_redirect") + .and_then(|h| h.to_str().ok()) + .and_then(|u| url::Url::parse(u).ok()) + .filter(|u| u.scheme() == "https" || u.scheme() == "http") + { + target + .query_pairs_mut() + .append_pair("bucket", &bucket) + .append_pair("key", &key) + .append_pair("etag", &etag); + let target = target.to_string(); + Response::builder() + .status(StatusCode::SEE_OTHER) + .header(header::LOCATION, target.clone()) + .header(header::ETAG, etag) + .body(target.into())? + } else { + let path = head + .uri + .into_parts() + .path_and_query + .map(|paq| paq.path().to_string()) + .unwrap_or_else(|| "/".to_string()); + let authority = head + .headers + .get(header::HOST) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default(); + let proto = if !authority.is_empty() { + "https://" + } else { + "" + }; + + let url_key: String = form_urlencoded::byte_serialize(key.as_bytes()) + .flat_map(str::chars) + .collect(); + let location = format!("{}{}{}{}", proto, authority, path, url_key); + + let action = params + .get("success_action_status") + .and_then(|h| h.to_str().ok()) + .unwrap_or("204"); + let builder = Response::builder() + .header(header::LOCATION, location.clone()) + .header(header::ETAG, etag.clone()); + match action { + "200" => builder.status(StatusCode::OK).body(Body::empty())?, + "201" => { + let xml = s3_xml::PostObject { + xmlns: (), + location: s3_xml::Value(location), + bucket: s3_xml::Value(bucket), + key: s3_xml::Value(key), + etag: s3_xml::Value(etag), + }; + let body = s3_xml::to_xml_with_header(&xml)?; + builder + .status(StatusCode::CREATED) + .body(Body::from(body.into_bytes()))? + } + _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?, + } + }; + + Ok(resp) +} + +#[derive(Deserialize)] +struct Policy { + expiration: String, + conditions: Vec, +} + +impl Policy { + fn into_conditions(self) -> Result { + let mut params = HashMap::<_, Vec<_>>::new(); + + let mut length = (0, u64::MAX); + for condition in self.conditions { + match condition { + PolicyCondition::Equal(map) => { + if map.len() != 1 { + return Err(Error::BadRequest("Invalid policy item".to_owned())); + } + let (mut k, v) = map.into_iter().next().expect("size was verified"); + k.make_ascii_lowercase(); + params.entry(k).or_default().push(Operation::Equal(v)); + } + PolicyCondition::OtherOp([cond, mut key, value]) => { + if key.remove(0) != '$' { + return Err(Error::BadRequest("Invalid policy item".to_owned())); + } + key.make_ascii_lowercase(); + match cond.as_str() { + "eq" => { + params.entry(key).or_default().push(Operation::Equal(value)); + } + "starts-with" => { + params + .entry(key) + .or_default() + .push(Operation::StartsWith(value)); + } + _ => return Err(Error::BadRequest("Invalid policy item".to_owned())), + } + } + PolicyCondition::SizeRange(key, min, max) => { + if key == "content-length-range" { + length.0 = length.0.max(min); + length.1 = length.1.min(max); + } else { + return Err(Error::BadRequest("Invalid policy item".to_owned())); + } + } + } + } + Ok(Conditions { + params, + content_length: RangeInclusive::new(length.0, length.1), + }) + } +} + +/// A single condition from a policy +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum PolicyCondition { + // will contain a single key-value pair + Equal(HashMap), + OtherOp([String; 3]), + SizeRange(String, u64, u64), +} + +#[derive(Debug)] +struct Conditions { + params: HashMap>, + content_length: RangeInclusive, +} + +#[derive(Debug, PartialEq, Eq)] +enum Operation { + Equal(String), + StartsWith(String), +} + +struct StreamLimiter { + inner: T, + length: RangeInclusive, + read: u64, +} + +impl StreamLimiter { + fn new(stream: T, length: RangeInclusive) -> Self { + StreamLimiter { + inner: stream, + length, + read: 0, + } + } +} + +impl Stream for StreamLimiter +where + T: Stream> + Unpin, +{ + type Item = Result; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let res = std::pin::Pin::new(&mut self.inner).poll_next(ctx); + match &res { + Poll::Ready(Some(Ok(bytes))) => { + self.read += bytes.len() as u64; + // optimization to fail early when we know before the end it's too long + if self.length.end() < &self.read { + return Poll::Ready(Some(Err(Error::BadRequest( + "File size does not match policy".to_owned(), + )))); + } + } + Poll::Ready(None) => { + if !self.length.contains(&self.read) { + return Poll::Ready(Some(Err(Error::BadRequest( + "File size does not match policy".to_owned(), + )))); + } + } + _ => {} + } + res + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_policy_1() { + let policy_json = br#" +{ "expiration": "2007-12-01T12:00:00.000Z", + "conditions": [ + {"acl": "public-read" }, + {"bucket": "johnsmith" }, + ["starts-with", "$key", "user/eric/"] + ] +} + "#; + let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap(); + let mut conditions = policy_2.into_conditions().unwrap(); + + assert_eq!( + conditions.params.remove(&"acl".to_string()), + Some(vec![Operation::Equal("public-read".into())]) + ); + assert_eq!( + conditions.params.remove(&"bucket".to_string()), + Some(vec![Operation::Equal("johnsmith".into())]) + ); + assert_eq!( + conditions.params.remove(&"key".to_string()), + Some(vec![Operation::StartsWith("user/eric/".into())]) + ); + assert!(conditions.params.is_empty()); + assert_eq!(conditions.content_length, 0..=u64::MAX); + } + + #[test] + fn test_policy_2() { + let policy_json = br#" +{ "expiration": "2007-12-01T12:00:00.000Z", + "conditions": [ + [ "eq", "$acl", "public-read" ], + ["starts-with", "$Content-Type", "image/"], + ["starts-with", "$success_action_redirect", ""], + ["content-length-range", 1048576, 10485760] + ] +} + "#; + let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap(); + let mut conditions = policy_2.into_conditions().unwrap(); + + assert_eq!( + conditions.params.remove(&"acl".to_string()), + Some(vec![Operation::Equal("public-read".into())]) + ); + assert_eq!( + conditions.params.remove("content-type").unwrap(), + vec![Operation::StartsWith("image/".into())] + ); + assert_eq!( + conditions + .params + .remove(&"success_action_redirect".to_string()), + Some(vec![Operation::StartsWith("".into())]) + ); + assert!(conditions.params.is_empty()); + assert_eq!(conditions.content_length, 1048576..=10485760); + } +} -- cgit v1.2.3 From 382e74c798263d042b1c6ca3788c866a8c69c4f4 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 24 May 2022 12:16:39 +0200 Subject: First version of admin API (#298) **Spec:** - [x] Start writing - [x] Specify all layout endpoints - [x] Specify all endpoints for operations on keys - [x] Specify all endpoints for operations on key/bucket permissions - [x] Specify all endpoints for operations on buckets - [x] Specify all endpoints for operations on bucket aliases View rendered spec at **Code:** - [x] Refactor code for admin api to use common api code that was created for K2V **General endpoints:** - [x] Metrics - [x] GetClusterStatus - [x] ConnectClusterNodes - [x] GetClusterLayout - [x] UpdateClusterLayout - [x] ApplyClusterLayout - [x] RevertClusterLayout **Key-related endpoints:** - [x] ListKeys - [x] CreateKey - [x] ImportKey - [x] GetKeyInfo - [x] UpdateKey - [x] DeleteKey **Bucket-related endpoints:** - [x] ListBuckets - [x] CreateBucket - [x] GetBucketInfo - [x] DeleteBucket - [x] PutBucketWebsite - [x] DeleteBucketWebsite **Operations on key/bucket permissions:** - [x] BucketAllowKey - [x] BucketDenyKey **Operations on bucket aliases:** - [x] GlobalAliasBucket - [x] GlobalUnaliasBucket - [x] LocalAliasBucket - [x] LocalUnaliasBucket **And also:** - [x] Separate error type for the admin API (this PR includes a quite big refactoring of error handling) - [x] Add management of website access - [ ] Check that nothing is missing wrt what can be done using the CLI - [ ] Improve formatting of the spec - [x] Make sure everyone is cool with the API design Fix #231 Fix #295 Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/298 Co-authored-by: Alex Co-committed-by: Alex --- src/api/s3/post_object.rs | 62 +++++++++++++++++++++-------------------------- 1 file changed, 28 insertions(+), 34 deletions(-) (limited to 'src/api/s3/post_object.rs') diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index 86fa7880..dc640f43 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -14,8 +14,7 @@ use serde::Deserialize; use garage_model::garage::Garage; -use crate::error::*; -use crate::helpers::resolve_bucket; +use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; use crate::s3::xml as s3_xml; use crate::signature::payload::{parse_date, verify_v4}; @@ -48,9 +47,7 @@ pub async fn handle_post_object( let field = if let Some(field) = multipart.next_field().await? { field } else { - return Err(Error::BadRequest( - "Request did not contain a file".to_owned(), - )); + return Err(Error::bad_request("Request did not contain a file")); }; let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { name @@ -66,14 +63,14 @@ pub async fn handle_post_object( "tag" => (/* tag need to be reencoded, but we don't support them yet anyway */), "acl" => { if params.insert("x-amz-acl", content).is_some() { - return Err(Error::BadRequest( - "Field 'acl' provided more than one time".to_string(), + return Err(Error::bad_request( + "Field 'acl' provided more than one time", )); } } _ => { if params.insert(&name, content).is_some() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Field '{}' provided more than one time", name ))); @@ -90,9 +87,7 @@ pub async fn handle_post_object( .to_str()?; let credential = params .get("x-amz-credential") - .ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })? + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))? .to_str()?; let policy = params .get("policy") @@ -129,15 +124,16 @@ pub async fn handle_post_object( ) .await?; - let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket, &api_key) + .await?; if !api_key.allow_write(&bucket_id) { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } - let decoded_policy = base64::decode(&policy)?; + let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; @@ -145,9 +141,7 @@ pub async fn handle_post_object( .ok_or_bad_request("Invalid expiration date")? .into(); if Utc::now() - expiration > Duration::zero() { - return Err(Error::BadRequest( - "Expiration date is in the paste".to_string(), - )); + return Err(Error::bad_request("Expiration date is in the paste")); } let mut conditions = decoded_policy.into_conditions()?; @@ -159,7 +153,7 @@ pub async fn handle_post_object( "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields "content-type" => { let conds = conditions.params.remove("content-type").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -169,7 +163,7 @@ pub async fn handle_post_object( } }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -178,7 +172,7 @@ pub async fn handle_post_object( } "key" => { let conds = conditions.params.remove("key").ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -186,7 +180,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => key.starts_with(&s), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -201,7 +195,7 @@ pub async fn handle_post_object( continue; } let conds = conditions.params.remove(¶m_key).ok_or_else(|| { - Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) + Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) })?; for cond in conds { let ok = match cond { @@ -209,7 +203,7 @@ pub async fn handle_post_object( Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()), }; if !ok { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' has value not allowed in policy", param_key ))); @@ -220,7 +214,7 @@ pub async fn handle_post_object( } if let Some((param_key, _)) = conditions.params.iter().next() { - return Err(Error::BadRequest(format!( + return Err(Error::bad_request(format!( "Key '{}' is required in policy, but no value was provided", param_key ))); @@ -326,7 +320,7 @@ impl Policy { match condition { PolicyCondition::Equal(map) => { if map.len() != 1 { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } let (mut k, v) = map.into_iter().next().expect("size was verified"); k.make_ascii_lowercase(); @@ -334,7 +328,7 @@ impl Policy { } PolicyCondition::OtherOp([cond, mut key, value]) => { if key.remove(0) != '$' { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } key.make_ascii_lowercase(); match cond.as_str() { @@ -347,7 +341,7 @@ impl Policy { .or_default() .push(Operation::StartsWith(value)); } - _ => return Err(Error::BadRequest("Invalid policy item".to_owned())), + _ => return Err(Error::bad_request("Invalid policy item")), } } PolicyCondition::SizeRange(key, min, max) => { @@ -355,7 +349,7 @@ impl Policy { length.0 = length.0.max(min); length.1 = length.1.min(max); } else { - return Err(Error::BadRequest("Invalid policy item".to_owned())); + return Err(Error::bad_request("Invalid policy item")); } } } @@ -420,15 +414,15 @@ where self.read += bytes.len() as u64; // optimization to fail early when we know before the end it's too long if self.length.end() < &self.read { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } Poll::Ready(None) => { if !self.length.contains(&self.read) { - return Poll::Ready(Some(Err(Error::BadRequest( - "File size does not match policy".to_owned(), + return Poll::Ready(Some(Err(Error::bad_request( + "File size does not match policy", )))); } } -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/api/s3/post_object.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/api/s3/post_object.rs') diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index dc640f43..d063faa4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -22,7 +22,7 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc, req: Request, - bucket: String, + bucket_name: String, ) -> Result, Error> { let boundary = req .headers() @@ -126,13 +126,18 @@ pub async fn handle_post_object( let bucket_id = garage .bucket_helper() - .resolve_bucket(&bucket, &api_key) + .resolve_bucket(&bucket_name, &api_key) .await?; if !api_key.allow_write(&bucket_id) { return Err(Error::forbidden("Operation is not allowed for this key.")); } + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; @@ -227,7 +232,7 @@ pub async fn handle_post_object( garage, headers, StreamLimiter::new(stream, conditions.content_length), - bucket_id, + &bucket, &key, None, None, @@ -244,7 +249,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket) + .append_pair("bucket", &bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -289,7 +294,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket), + bucket: s3_xml::Value(bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; -- cgit v1.2.3