diff options
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 68 |
1 files changed, 33 insertions, 35 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 489f1136..36523b30 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; @@ -42,9 +41,8 @@ use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; pub async fn handle_put( - garage: Arc<Garage>, + ctx: ReqCtx, req: Request<ReqBody>, - bucket: &Bucket, key: &String, content_sha256: Option<Hash>, ) -> Result<Response<ResBody>, Error> { @@ -59,35 +57,27 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream( - garage, - headers, - stream, - bucket, - key, - content_md5, - content_sha256, - ) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + save_stream(&ctx, headers, stream, key, content_md5, content_sha256) + .await + .map(|(uuid, md5)| put_response(uuid, md5)) } pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: Arc<Garage>, + ctx: &ReqCtx, headers: ObjectVersionHeaders, body: S, - bucket: &Bucket, key: &String, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, ) -> Result<(Uuid, String), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage - .object_table - .get(&bucket.id, key) - .map_err(Error::from), + garage.object_table.get(bucket_id, key).map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; + check_quotas(ctx, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( )), }; - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), version_uuid, version_timestamp, @@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( multipart: false, }, }; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); + let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table @@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let version = Version::new( version_uuid, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), }, false, @@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; + check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; // We were not interrupted, everything went fine. @@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches( /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( - garage: &Arc<Garage>, - bucket: &Bucket, + ctx: &ReqCtx, size: u64, prev_object: Option<&Object>, ) -> Result<(), Error> { - let quotas = bucket.state.as_option().unwrap().quotas.get(); + let ReqCtx { + garage, + bucket_id, + bucket_params, + .. + } = ctx; + + let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; @@ -248,7 +244,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(&bucket.id, &EmptyKey) + .get(bucket_id, &EmptyKey) .await?; let counters = counters @@ -292,7 +288,7 @@ pub(crate) async fn check_quotas( } pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, first_block: Bytes, @@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let offset = written_bytes; written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( - garage, + ctx, version, part_number, offset, @@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + } async fn put_block_and_meta( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, offset: u64, @@ -455,6 +451,8 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { + let ReqCtx { garage, .. } = ctx; + let mut version = version.clone(); version.blocks.put( VersionBlockKey { |