diff options
author | Alex <alex@adnab.me> | 2023-06-09 15:34:09 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-06-09 15:34:09 +0000 |
commit | 0a06fda0da35f4018c39bf6aec90e55bdf42d241 (patch) | |
tree | d2b758400f336b63e236c431a965d191954322a8 /src/garage | |
parent | e7e164a280dfc1c4adf9d6da6f3b2a9674eca4bd (diff) | |
parent | 3d477906d4ff418de10973db7bd3e940f2e47b2d (diff) | |
download | garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.tar.gz garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.zip |
Merge pull request 'Fix #204 (full Multipart Uploads semantics)' (#553) from nlnet-task1 into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/553
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin/block.rs | 106 | ||||
-rw-r--r-- | src/garage/admin/bucket.rs | 10 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 3 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 6 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 13 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 66 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 226 | ||||
-rw-r--r-- | src/garage/tests/s3/multipart.rs | 223 |
8 files changed, 500 insertions, 153 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index e9e3ff96..c4a45738 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -34,6 +34,7 @@ impl AdminRpcHandler { .get_range(&hash, None, None, 10000, Default::default()) .await?; let mut versions = vec![]; + let mut uploads = vec![]; for br in block_refs { if let Some(v) = self .garage @@ -41,6 +42,11 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { + if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { + if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + uploads.push(u); + } + } versions.push(Ok(v)); } else { versions.push(Err(br.version)); @@ -50,6 +56,7 @@ impl AdminRpcHandler { hash, refcount, versions, + uploads, }) } @@ -93,6 +100,7 @@ impl AdminRpcHandler { } let mut obj_dels = 0; + let mut mpu_dels = 0; let mut ver_dels = 0; for hash in blocks { @@ -105,56 +113,80 @@ impl AdminRpcHandler { .await?; for br in block_refs { - let version = match self + if let Some(version) = self .garage .version_table .get(&br.version, &EmptyKey) .await? { - Some(v) => v, - None => continue, - }; + self.handle_block_purge_version_backlink( + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; } } - - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.bucket_id, version.key.clone(), true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } } } + Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), + ver_dels, obj_dels, - ver_dels + mpu_dels, ))) } + + async fn handle_block_purge_version_backlink( + &self, + version: &Version, + obj_dels: &mut usize, + mpu_dels: &mut usize, + ) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) + } } diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 11bb8730..0781cb8b 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -73,6 +73,15 @@ impl AdminRpcHandler { .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) .unwrap_or_default(); + let mpu_counters = self + .garage + .mpu_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -112,6 +121,7 @@ impl AdminRpcHandler { bucket, relevant_keys, counters, + mpu_counters, }) } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 93f6dd08..33c21eba 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,6 +27,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -52,6 +53,7 @@ pub enum AdminRpc { bucket: Bucket, relevant_keys: HashMap<String, Key>, counters: HashMap<String, i64>, + mpu_counters: HashMap<String, i64>, }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), @@ -66,6 +68,7 @@ pub enum AdminRpc { hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, }, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 905b14d3..045f050c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -190,8 +190,9 @@ pub async fn cmd_admin( bucket, relevant_keys, counters, + mpu_counters, } => { - print_bucket_info(&bucket, &relevant_keys, &counters); + print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); @@ -215,8 +216,9 @@ pub async fn cmd_admin( hash, refcount, versions, + uploads, } => { - print_block_info(hash, refcount, versions); + print_block_info(hash, refcount, versions, uploads); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 986592ae..5dc99a0d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -443,19 +443,22 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { - /// Only do a full sync of metadata tables + /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] Tables, - /// Only repair (resync/rebalance) the set of stored blocks + /// Repair (resync/rebalance) the set of stored blocks #[structopt(name = "blocks", version = garage_version())] Blocks, - /// Only redo the propagation of object deletions to the version table (slow) + /// Repropagate object deletions to the version table #[structopt(name = "versions", version = garage_version())] Versions, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) + /// Repropagate object deletions to the multipart upload table + #[structopt(name = "mpu", version = garage_version())] + MultipartUploads, + /// Repropagate version deletions to the block ref table #[structopt(name = "block_refs", version = garage_version())] BlockRefs, - /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { #[structopt(subcommand)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2c6be2f4..d87f9eab 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; -use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -use garage_model::s3::version_table::Version; +use garage_model::s3::mpu_table::{self, MultipartUpload}; +use garage_model::s3::object_table; +use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -135,6 +136,7 @@ pub fn print_bucket_info( bucket: &Bucket, relevant_keys: &HashMap<String, Key>, counters: &HashMap<String, i64>, + mpu_counters: &HashMap<String, i64>, ) { let key_name = |k| { relevant_keys @@ -148,7 +150,7 @@ pub fn print_bucket_info( Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { let size = - bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64); println!( "\nSize: {} ({})", size.to_string_as(true), @@ -156,14 +158,22 @@ pub fn print_bucket_info( ); println!( "Objects: {}", - counters.get(OBJECTS).cloned().unwrap_or_default() + *counters.get(object_table::OBJECTS).unwrap_or(&0) + ); + println!( + "Unfinished uploads (multipart and non-multipart): {}", + *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0) ); println!( "Unfinished multipart uploads: {}", - counters - .get(UNFINISHED_UPLOADS) - .cloned() - .unwrap_or_default() + *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0) + ); + let mpu_size = + bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64); + println!( + "Size of unfinished multipart uploads: {} ({})", + mpu_size.to_string_as(true), + mpu_size.to_string_as(false), ); println!("\nWebsite access: {}", p.website_config.get().is_some()); @@ -385,29 +395,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { format_table(table); } -pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { +pub fn print_block_info( + hash: Hash, + refcount: u64, + versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, +) { println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { Ok(ver) => { - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}", - ver.uuid, - ver.bucket_id, - ver.key, - ver.deleted.get() - )); + match &ver.backlink { + VersionBacklink::Object { bucket_id, key } => { + table.push(format!( + "{:?}\t{:?}\t{}\t\t{:?}", + ver.uuid, + bucket_id, + key, + ver.deleted.get() + )); + } + VersionBacklink::MultipartUpload { upload_id } => { + let upload = uploads.iter().find(|x| x.upload_id == *upload_id); + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}\t{:?}", + ver.uuid, + upload.map(|u| u.bucket_id).unwrap_or_default(), + upload.map(|u| u.key.as_str()).unwrap_or_default(), + upload_id, + ver.deleted.get() + )); + } + } if !ver.deleted.get() { nondeleted_count += 1; } } Err(vh) => { - table.push(format!("{:?}\t\t\tyes", vh)); + table.push(format!("{:?}\t\t\t\tyes", vh)); } } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 0e14ed51..abfaf9f9 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,16 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +37,15 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,70 +76,70 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + async fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> Result<bool, Error>; +} + +struct TableRepairWorker<T: TableRepair> { garage: Arc<Garage>, pos: Vec<u8>, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc<Garage>) -> Self { +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl<R: TableRepair> Worker for TableRepairWorker<R> { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; - if !version.deleted.get() { - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) - .await?; - } + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; } self.counter += 1; @@ -146,77 +155,124 @@ impl Worker for RepairVersionsWorker { // ---- -struct RepairBlockrefsWorker { - garage: Arc<Garage>, - pos: Vec<u8>, - counter: usize, -} +struct RepairVersions; -impl RepairBlockrefsWorker { - fn new(garage: Arc<Garage>) -> Self { - Self { - garage, - pos: vec![], - counter: 0, +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { + if !version.deleted.get() { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(bucket_id, key) + .await? + .map(|o| { + o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), + }; + + if !ref_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } } + + Ok(false) } } +// ---- + +struct RepairBlockRefs; + #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> { if !block_ref.deleted.get() { - let version = self - .garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; + Ok(false) + } +} - Ok(WorkerState::Busy) +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table } - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + return Ok(true); + } + } + + Ok(false) } } diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..8ae6b66e 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -6,6 +6,190 @@ const SZ_5MB: usize = 5 * 1024 * 1024; const SZ_10MB: usize = 10 * 1024 * 1024; #[tokio::test] +async fn test_multipart_upload() { + let ctx = common::context(); + let bucket = ctx.create_bucket("testmpu"); + + let u1 = vec![0x11; SZ_5MB]; + let u2 = vec![0x22; SZ_5MB]; + let u3 = vec![0x33; SZ_5MB]; + let u4 = vec![0x44; SZ_5MB]; + let u5 = vec![0x55; SZ_5MB]; + + let up = ctx + .client + .create_multipart_upload() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + assert!(up.upload_id.is_some()); + + let uid = up.upload_id.as_ref().unwrap(); + + let p3 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(3) + .body(ByteStream::from(u3.clone())) + .send() + .await + .unwrap(); + + let _p1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u1)) + .send() + .await + .unwrap(); + + let _p4 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(4) + .body(ByteStream::from(u4)) + .send() + .await + .unwrap(); + + let p1bis = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u2.clone())) + .send() + .await + .unwrap(); + + let p6 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(6) + .body(ByteStream::from(u5.clone())) + .send() + .await + .unwrap(); + + { + let r = ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .unwrap(); + assert_eq!(r.parts.unwrap().len(), 4); + } + + let cmp = CompletedMultipartUpload::builder() + .parts( + CompletedPart::builder() + .part_number(1) + .e_tag(p1bis.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(3) + .e_tag(p3.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(6) + .e_tag(p6.e_tag.unwrap()) + .build(), + ) + .build(); + + ctx.client + .complete_multipart_upload() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .multipart_upload(cmp) + .send() + .await + .unwrap(); + + // The multipart upload must not appear anymore + assert!(ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .is_err()); + + { + // The object must appear as a regular object + let r = ctx + .client + .head_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_eq!(r.content_length, (SZ_5MB * 3) as i64); + } + + { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat()); + } + + { + for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .part_number(part_number) + .send() + .await + .unwrap(); + + eprintln!("get_object with part_number = {}", part_number); + assert_eq!(o.content_length, SZ_5MB as i64); + assert_bytes_eq!(o.body, data); + } + } +} + +#[tokio::test] async fn test_uploadlistpart() { let ctx = common::context(); let bucket = ctx.create_bucket("uploadpart"); @@ -65,7 +249,8 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 1); - let fp = ps.iter().find(|x| x.part_number == 2).unwrap(); + assert_eq!(ps[0].part_number, 2); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), @@ -100,13 +285,24 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 2); - let fp = ps.iter().find(|x| x.part_number == 1).unwrap(); + + assert_eq!(ps[0].part_number, 1); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" ); assert_eq!(fp.size, SZ_5MB as i64); + + assert_eq!(ps[1].part_number, 2); + let sp = &ps[1]; + assert!(sp.last_modified.is_some()); + assert_eq!( + sp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + assert_eq!(sp.size, SZ_5MB as i64); } { @@ -123,12 +319,19 @@ async fn test_uploadlistpart() { .unwrap(); assert!(r.part_number_marker.is_none()); - assert!(r.next_part_number_marker.is_some()); + assert_eq!(r.next_part_number_marker.as_deref(), Some("1")); assert_eq!(r.max_parts, 1_i32); assert!(r.is_truncated); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r.parts.unwrap().len(), 1); + let parts = r.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 1); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3c484266f9315485694556e6c693bfa2\"" + ); let r2 = ctx .client @@ -147,10 +350,18 @@ async fn test_uploadlistpart() { r.next_part_number_marker.as_ref().unwrap() ); assert_eq!(r2.max_parts, 1_i32); - assert!(r2.is_truncated); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r2.parts.unwrap().len(), 1); + let parts = r2.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 2); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + //assert!(r2.is_truncated); // WHY? (this was the test before) + assert!(!r2.is_truncated); } let cmp = CompletedMultipartUpload::builder() |