aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs68
1 files changed, 33 insertions, 35 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 489f1136..36523b30 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_block::manager::INLINE_THRESHOLD;
-use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::index_counter::CountedItem;
use garage_model::s3::block_ref_table::*;
@@ -42,9 +41,8 @@ use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub async fn handle_put(
- garage: Arc<Garage>,
+ ctx: ReqCtx,
req: Request<ReqBody>,
- bucket: &Bucket,
key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
@@ -59,35 +57,27 @@ pub async fn handle_put(
let stream = body_stream(req.into_body());
- save_stream(
- garage,
- headers,
- stream,
- bucket,
- key,
- content_md5,
- content_sha256,
- )
- .await
- .map(|(uuid, md5)| put_response(uuid, md5))
+ save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
+ .await
+ .map(|(uuid, md5)| put_response(uuid, md5))
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
- garage: Arc<Garage>,
+ ctx: &ReqCtx,
headers: ObjectVersionHeaders,
body: S,
- bucket: &Bucket,
key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
+ let ReqCtx {
+ garage, bucket_id, ..
+ } = ctx;
+
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
- garage
- .object_table
- .get(&bucket.id, key)
- .map_err(Error::from),
+ garage.object_table.get(bucket_id, key).map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
@@ -114,7 +104,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
+ check_quotas(ctx, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
@@ -129,7 +119,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
)),
};
- let object = Object::new(bucket.id, key.into(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
@@ -140,7 +130,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// before everything is finished (cleanup is done using the Drop trait).
let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner {
garage: garage.clone(),
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.into(),
version_uuid,
version_timestamp,
@@ -156,7 +146,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
multipart: false,
},
};
- let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
@@ -166,7 +156,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version = Version::new(
version_uuid,
VersionBacklink::Object {
- bucket_id: bucket.id,
+ bucket_id: *bucket_id,
key: key.into(),
},
false,
@@ -175,7 +165,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
- read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?;
+ read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
@@ -184,7 +174,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
+ check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
@@ -196,7 +186,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
},
first_block_hash,
));
- let object = Object::new(bucket.id, key.into(), vec![object_version]);
+ let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
// We were not interrupted, everything went fine.
@@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches(
/// Check that inserting this object with this size doesn't exceed bucket quotas
pub(crate) async fn check_quotas(
- garage: &Arc<Garage>,
- bucket: &Bucket,
+ ctx: &ReqCtx,
size: u64,
prev_object: Option<&Object>,
) -> Result<(), Error> {
- let quotas = bucket.state.as_option().unwrap().quotas.get();
+ let ReqCtx {
+ garage,
+ bucket_id,
+ bucket_params,
+ ..
+ } = ctx;
+
+ let quotas = bucket_params.quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
@@ -248,7 +244,7 @@ pub(crate) async fn check_quotas(
let counters = garage
.object_counter_table
.table
- .get(&bucket.id, &EmptyKey)
+ .get(bucket_id, &EmptyKey)
.await?;
let counters = counters
@@ -292,7 +288,7 @@ pub(crate) async fn check_quotas(
}
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
- garage: &Garage,
+ ctx: &ReqCtx,
version: &Version,
part_number: u64,
first_block: Bytes,
@@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let offset = written_bytes;
written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
- garage,
+ ctx,
version,
part_number,
offset,
@@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}
async fn put_block_and_meta(
- garage: &Garage,
+ ctx: &ReqCtx,
version: &Version,
part_number: u64,
offset: u64,
@@ -455,6 +451,8 @@ async fn put_block_and_meta(
block: Bytes,
order_tag: OrderTag,
) -> Result<(), GarageError> {
+ let ReqCtx { garage, .. } = ctx;
+
let mut version = version.clone();
version.blocks.put(
VersionBlockKey {