diff options
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 127 |
1 files changed, 101 insertions, 26 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 8b06ef3f..881320af 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; use futures::prelude::*; @@ -14,7 +14,9 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; +use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; +use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; @@ -26,7 +28,7 @@ use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { @@ -46,7 +48,7 @@ pub async fn handle_put( garage, headers, body, - bucket_id, + bucket, key, content_md5, content_sha256, @@ -59,7 +61,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: Arc<Garage>, headers: ObjectVersionHeaders, body: S, - bucket_id: Uuid, + bucket: &Bucket, key: &str, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, @@ -80,6 +82,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let data_md5sum_hex = hex::encode(data_md5sum); let data_sha256sum = sha256sum(&first_block[..]); + let size = first_block.len() as u64; ensure_checksum_matches( data_md5sum.as_slice(), @@ -88,20 +91,22 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; + check_quotas(&garage, bucket, key, size).await?; + let object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( ObjectVersionMeta { headers, - size: first_block.len() as u64, + size, etag: data_md5sum_hex.clone(), }, first_block, )), }; - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -114,36 +119,42 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket_id, key.into(), false); + let version = Version::new(version_uuid, bucket.id, key.into(), false); garage.version_table.insert(&version).await?; // Transfer data and verify checksum let first_block_hash = blake2sum(&first_block[..]); - let tx_result = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await - .and_then(|(total_size, data_md5sum, data_sha256sum)| { + + let tx_result = (|| async { + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; + ensure_checksum_matches( data_md5sum.as_slice(), data_sha256sum, content_md5.as_deref(), content_sha256, - ) - .map(|()| (total_size, data_md5sum)) - }); + )?; + + check_quotas(&garage, bucket, key, total_size).await?; + + Ok((total_size, data_md5sum)) + })() + .await; // If something went wrong, clean up let (total_size, md5sum_arr) = match tx_result { @@ -151,7 +162,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( Err(e) => { // Mark object as aborted, this will free the blocks further down object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); + let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; return Err(e); } @@ -167,7 +178,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket_id, key.into(), vec![object_version]); + let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; Ok((version_uuid, md5sum_hex)) @@ -200,6 +211,62 @@ fn ensure_checksum_matches( Ok(()) } +/// Check that inserting this object with this size doesn't exceed bucket quotas +async fn check_quotas( + garage: &Arc<Garage>, + bucket: &Bucket, + key: &str, + size: u64, +) -> Result<(), Error> { + let quotas = bucket.state.as_option().unwrap().quotas.get(); + if quotas.max_objects.is_none() && quotas.max_size.is_none() { + return Ok(()); + }; + + let key = key.to_string(); + let (prev_object, counters) = futures::try_join!( + garage.object_table.get(&bucket.id, &key), + garage.object_counter_table.table.get(&EmptyKey, &bucket.id), + )?; + + let counters = counters + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + + let (prev_cnt_obj, prev_cnt_size) = match prev_object { + Some(o) => { + let prev_cnt = o.counts().into_iter().collect::<HashMap<_, _>>(); + ( + prev_cnt.get(OBJECTS).cloned().unwrap_or_default(), + prev_cnt.get(BYTES).cloned().unwrap_or_default(), + ) + } + None => (0, 0), + }; + let cnt_obj_diff = 1 - prev_cnt_obj; + let cnt_size_diff = size as i64 - prev_cnt_size; + + if let Some(mo) = quotas.max_objects { + let current_objects = counters.get(OBJECTS).cloned().unwrap_or_default(); + if cnt_obj_diff > 0 && current_objects + cnt_obj_diff > mo as i64 { + return Err(Error::forbidden(format!( + "Object quota is reached, maximum objects for this bucket: {}", + mo + ))); + } + } + + if let Some(ms) = quotas.max_size { + let current_size = counters.get(BYTES).cloned().unwrap_or_default(); + if cnt_size_diff > 0 && current_size + cnt_obj_diff > ms as i64 { + return Err(Error::forbidden(format!("Bucket size quota is reached, maximum total sie of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", ms, + current_size, size))); + } + } + + Ok(()) +} + async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, @@ -473,7 +540,7 @@ pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, req: Request<Body>, bucket_name: &str, - bucket_id: Uuid, + bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, @@ -497,7 +564,7 @@ pub async fn handle_complete_multipart_upload( // Get object and version let key = key.to_string(); let (object, version) = futures::try_join!( - garage.object_table.get(&bucket_id, &key), + garage.object_table.get(&bucket.id, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; @@ -590,6 +657,14 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); + if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + return Err(e); + } + // Write final object version object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { @@ -600,7 +675,7 @@ pub async fn handle_complete_multipart_upload( version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket_id, key.clone(), vec![object_version]); + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done |