aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-06-09 15:34:09 +0000
committerAlex <alex@adnab.me>2023-06-09 15:34:09 +0000
commit0a06fda0da35f4018c39bf6aec90e55bdf42d241 (patch)
treed2b758400f336b63e236c431a965d191954322a8 /src/garage
parente7e164a280dfc1c4adf9d6da6f3b2a9674eca4bd (diff)
parent3d477906d4ff418de10973db7bd3e940f2e47b2d (diff)
downloadgarage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.tar.gz
garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.zip
Merge pull request 'Fix #204 (full Multipart Uploads semantics)' (#553) from nlnet-task1 into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/553
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/admin/block.rs106
-rw-r--r--src/garage/admin/bucket.rs10
-rw-r--r--src/garage/admin/mod.rs3
-rw-r--r--src/garage/cli/cmd.rs6
-rw-r--r--src/garage/cli/structs.rs13
-rw-r--r--src/garage/cli/util.rs66
-rw-r--r--src/garage/repair/online.rs226
-rw-r--r--src/garage/tests/s3/multipart.rs223
8 files changed, 500 insertions, 153 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
index e9e3ff96..c4a45738 100644
--- a/src/garage/admin/block.rs
+++ b/src/garage/admin/block.rs
@@ -34,6 +34,7 @@ impl AdminRpcHandler {
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
+ let mut uploads = vec![];
for br in block_refs {
if let Some(v) = self
.garage
@@ -41,6 +42,11 @@ impl AdminRpcHandler {
.get(&br.version, &EmptyKey)
.await?
{
+ if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
+ if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ uploads.push(u);
+ }
+ }
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
@@ -50,6 +56,7 @@ impl AdminRpcHandler {
hash,
refcount,
versions,
+ uploads,
})
}
@@ -93,6 +100,7 @@ impl AdminRpcHandler {
}
let mut obj_dels = 0;
+ let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
@@ -105,56 +113,80 @@ impl AdminRpcHandler {
.await?;
for br in block_refs {
- let version = match self
+ if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
- Some(v) => v,
- None => continue,
- };
+ self.handle_block_purge_version_backlink(
+ &version,
+ &mut obj_dels,
+ &mut mpu_dels,
+ )
+ .await?;
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
+ if !version.deleted.get() {
+ let deleted_version = Version::new(version.uuid, version.backlink, true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
}
}
-
- if !version.deleted.get() {
- let deleted_version =
- Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
}
}
+
Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(),
+ ver_dels,
obj_dels,
- ver_dels
+ mpu_dels,
)))
}
+
+ async fn handle_block_purge_version_backlink(
+ &self,
+ version: &Version,
+ obj_dels: &mut usize,
+ mpu_dels: &mut usize,
+ ) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ self.garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
index 11bb8730..0781cb8b 100644
--- a/src/garage/admin/bucket.rs
+++ b/src/garage/admin/bucket.rs
@@ -73,6 +73,15 @@ impl AdminRpcHandler {
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default();
+ let mpu_counters = self
+ .garage
+ .mpu_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -112,6 +121,7 @@ impl AdminRpcHandler {
bucket,
relevant_keys,
counters,
+ mpu_counters,
})
}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 93f6dd08..33c21eba 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -27,6 +27,7 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
+use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use crate::cli::*;
@@ -52,6 +53,7 @@ pub enum AdminRpc {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
+ mpu_counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
@@ -66,6 +68,7 @@ pub enum AdminRpc {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
+ uploads: Vec<MultipartUpload>,
},
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 905b14d3..045f050c 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -190,8 +190,9 @@ pub async fn cmd_admin(
bucket,
relevant_keys,
counters,
+ mpu_counters,
} => {
- print_bucket_info(&bucket, &relevant_keys, &counters);
+ print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
@@ -215,8 +216,9 @@ pub async fn cmd_admin(
hash,
refcount,
versions,
+ uploads,
} => {
- print_block_info(hash, refcount, versions);
+ print_block_info(hash, refcount, versions, uploads);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 986592ae..5dc99a0d 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -443,19 +443,22 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
- /// Only do a full sync of metadata tables
+ /// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
Tables,
- /// Only repair (resync/rebalance) the set of stored blocks
+ /// Repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks", version = garage_version())]
Blocks,
- /// Only redo the propagation of object deletions to the version table (slow)
+ /// Repropagate object deletions to the version table
#[structopt(name = "versions", version = garage_version())]
Versions,
- /// Only redo the propagation of version deletions to the block ref table (extremely slow)
+ /// Repropagate object deletions to the multipart upload table
+ #[structopt(name = "mpu", version = garage_version())]
+ MultipartUploads,
+ /// Repropagate version deletions to the block ref table
#[structopt(name = "block_refs", version = garage_version())]
BlockRefs,
- /// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
+ /// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())]
Scrub {
#[structopt(subcommand)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 2c6be2f4..d87f9eab 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
-use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
-use garage_model::s3::version_table::Version;
+use garage_model::s3::mpu_table::{self, MultipartUpload};
+use garage_model::s3::object_table;
+use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt;
@@ -135,6 +136,7 @@ pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
+ mpu_counters: &HashMap<String, i64>,
) {
let key_name = |k| {
relevant_keys
@@ -148,7 +150,7 @@ pub fn print_bucket_info(
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
let size =
- bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
@@ -156,14 +158,22 @@ pub fn print_bucket_info(
);
println!(
"Objects: {}",
- counters.get(OBJECTS).cloned().unwrap_or_default()
+ *counters.get(object_table::OBJECTS).unwrap_or(&0)
+ );
+ println!(
+ "Unfinished uploads (multipart and non-multipart): {}",
+ *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0)
);
println!(
"Unfinished multipart uploads: {}",
- counters
- .get(UNFINISHED_UPLOADS)
- .cloned()
- .unwrap_or_default()
+ *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0)
+ );
+ let mpu_size =
+ bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64);
+ println!(
+ "Size of unfinished multipart uploads: {} ({})",
+ mpu_size.to_string_as(true),
+ mpu_size.to_string_as(false),
);
println!("\nWebsite access: {}", p.website_config.get().is_some());
@@ -385,29 +395,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
format_table(table);
}
-pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+pub fn print_block_info(
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ uploads: Vec<MultipartUpload>,
+) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
- let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
- table.push(format!(
- "{:?}\t{:?}\t{}\t{:?}",
- ver.uuid,
- ver.bucket_id,
- ver.key,
- ver.deleted.get()
- ));
+ match &ver.backlink {
+ VersionBacklink::Object { bucket_id, key } => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t\t{:?}",
+ ver.uuid,
+ bucket_id,
+ key,
+ ver.deleted.get()
+ ));
+ }
+ VersionBacklink::MultipartUpload { upload_id } => {
+ let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}\t{:?}",
+ ver.uuid,
+ upload.map(|u| u.bucket_id).unwrap_or_default(),
+ upload.map(|u| u.key.as_str()).unwrap_or_default(),
+ upload_id,
+ ver.deleted.get()
+ ));
+ }
+ }
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
- table.push(format!("{:?}\t\t\tyes", vh));
+ table.push(format!("{:?}\t\t\t\tyes", vh));
}
}
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 0e14ed51..abfaf9f9 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -5,11 +5,16 @@ use async_trait::async_trait;
use tokio::sync::watch;
use garage_block::repair::ScrubWorkerCommand;
+
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
+use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+
+use garage_table::replication::*;
use garage_table::*;
+
use garage_util::background::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
@@ -32,11 +37,15 @@ pub async fn launch_online_repair(
}
RepairWhat::Versions => {
info!("Repairing the versions table");
- bg.spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
+ }
+ RepairWhat::MultipartUploads => {
+ info!("Repairing the multipart uploads table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
- bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
@@ -67,70 +76,70 @@ pub async fn launch_online_repair(
// ----
-struct RepairVersionsWorker {
+#[async_trait]
+trait TableRepair: Send + Sync + 'static {
+ type T: TableSchema;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
+
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ entry: <<Self as TableRepair>::T as TableSchema>::E,
+ ) -> Result<bool, Error>;
+}
+
+struct TableRepairWorker<T: TableRepair> {
garage: Arc<Garage>,
pos: Vec<u8>,
counter: usize,
+ repairs: usize,
+ inner: T,
}
-impl RepairVersionsWorker {
- fn new(garage: Arc<Garage>) -> Self {
+impl<R: TableRepair> TableRepairWorker<R> {
+ fn new(garage: Arc<Garage>, inner: R) -> Self {
Self {
garage,
+ inner,
pos: vec![],
counter: 0,
+ repairs: 0,
}
}
}
#[async_trait]
-impl Worker for RepairVersionsWorker {
+impl<R: TableRepair> Worker for TableRepairWorker<R> {
fn name(&self) -> String {
- "Version repair worker".into()
+ format!("{} repair worker", R::T::TABLE_NAME)
}
fn status(&self) -> WorkerStatus {
WorkerStatus {
- progress: Some(self.counter.to_string()),
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
- info!("repair_versions: finished, done {}", self.counter);
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
return Ok(WorkerState::Done);
}
};
- let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
- if !version.deleted.get() {
- let object = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?;
- let version_exists = match object {
- Some(o) => o
- .versions()
- .iter()
- .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => false,
- };
- if !version_exists {
- info!("Repair versions: marking version as deleted: {:?}", version);
- self.garage
- .version_table
- .insert(&Version::new(
- version.uuid,
- version.bucket_id,
- version.key,
- true,
- ))
- .await?;
- }
+ let entry = <R::T as TableSchema>::E::decode(&item_bytes)
+ .ok_or_message("Cannot decode table entry")?;
+ if self.inner.process(&self.garage, entry).await? {
+ self.repairs += 1;
}
self.counter += 1;
@@ -146,77 +155,124 @@ impl Worker for RepairVersionsWorker {
// ----
-struct RepairBlockrefsWorker {
- garage: Arc<Garage>,
- pos: Vec<u8>,
- counter: usize,
-}
+struct RepairVersions;
-impl RepairBlockrefsWorker {
- fn new(garage: Arc<Garage>) -> Self {
- Self {
- garage,
- pos: vec![],
- counter: 0,
+#[async_trait]
+impl TableRepair for RepairVersions {
+ type T = VersionTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.version_table
+ }
+
+ async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
+ if !version.deleted.get() {
+ let ref_exists = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => garage
+ .object_table
+ .get(bucket_id, key)
+ .await?
+ .map(|o| {
+ o.versions().iter().any(|x| {
+ x.uuid == version.uuid && x.state != ObjectVersionState::Aborted
+ })
+ })
+ .unwrap_or(false),
+ VersionBacklink::MultipartUpload { upload_id } => garage
+ .mpu_table
+ .get(upload_id, &EmptyKey)
+ .await?
+ .map(|u| !u.deleted.get())
+ .unwrap_or(false),
+ };
+
+ if !ref_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ garage
+ .version_table
+ .insert(&Version::new(version.uuid, version.backlink, true))
+ .await?;
+ return Ok(true);
+ }
}
+
+ Ok(false)
}
}
+// ----
+
+struct RepairBlockRefs;
+
#[async_trait]
-impl Worker for RepairBlockrefsWorker {
- fn name(&self) -> String {
- "Block refs repair worker".into()
- }
+impl TableRepair for RepairBlockRefs {
+ type T = BlockRefTable;
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- progress: Some(self.counter.to_string()),
- ..Default::default()
- }
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.block_ref_table
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) =
- match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => (v, k),
- None => {
- info!("repair_block_ref: finished, done {}", self.counter);
- return Ok(WorkerState::Done);
- }
- };
-
- let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
+ async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
if !block_ref.deleted.get() {
- let version = self
- .garage
+ let ref_exists = garage
.version_table
.get(&block_ref.version, &EmptyKey)
- .await?;
- // The version might not exist if it has been GC'ed
- let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
+ .await?
+ .map(|v| !v.deleted.get())
+ .unwrap_or(false);
+
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
block_ref
);
- self.garage
- .block_ref_table
- .insert(&BlockRef {
- block: block_ref.block,
- version: block_ref.version,
- deleted: true.into(),
- })
- .await?;
+ block_ref.deleted.set();
+ garage.block_ref_table.insert(&block_ref).await?;
+ return Ok(true);
}
}
- self.counter += 1;
- self.pos = next_pos;
+ Ok(false)
+ }
+}
- Ok(WorkerState::Busy)
+// ----
+
+struct RepairMpu;
+
+#[async_trait]
+impl TableRepair for RepairMpu {
+ type T = MultipartUploadTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.mpu_table
}
- async fn wait_for_work(&mut self) -> WorkerState {
- unreachable!()
+ async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
+ if !mpu.deleted.get() {
+ let ref_exists = garage
+ .object_table
+ .get(&mpu.bucket_id, &mpu.key)
+ .await?
+ .map(|o| {
+ o.versions()
+ .iter()
+ .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true)))
+ })
+ .unwrap_or(false);
+
+ if !ref_exists {
+ info!(
+ "Repair multipart uploads: marking mpu as deleted: {:?}",
+ mpu
+ );
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
}
}
diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs
index 895a2993..8ae6b66e 100644
--- a/src/garage/tests/s3/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
@@ -6,6 +6,190 @@ const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;
#[tokio::test]
+async fn test_multipart_upload() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("testmpu");
+
+ let u1 = vec![0x11; SZ_5MB];
+ let u2 = vec![0x22; SZ_5MB];
+ let u3 = vec![0x33; SZ_5MB];
+ let u4 = vec![0x44; SZ_5MB];
+ let u5 = vec![0x55; SZ_5MB];
+
+ let up = ctx
+ .client
+ .create_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+ assert!(up.upload_id.is_some());
+
+ let uid = up.upload_id.as_ref().unwrap();
+
+ let p3 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(3)
+ .body(ByteStream::from(u3.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let _p1 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(1)
+ .body(ByteStream::from(u1))
+ .send()
+ .await
+ .unwrap();
+
+ let _p4 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(4)
+ .body(ByteStream::from(u4))
+ .send()
+ .await
+ .unwrap();
+
+ let p1bis = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(1)
+ .body(ByteStream::from(u2.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let p6 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(6)
+ .body(ByteStream::from(u5.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ {
+ let r = ctx
+ .client
+ .list_parts()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.parts.unwrap().len(), 4);
+ }
+
+ let cmp = CompletedMultipartUpload::builder()
+ .parts(
+ CompletedPart::builder()
+ .part_number(1)
+ .e_tag(p1bis.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(3)
+ .e_tag(p3.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(6)
+ .e_tag(p6.e_tag.unwrap())
+ .build(),
+ )
+ .build();
+
+ ctx.client
+ .complete_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .multipart_upload(cmp)
+ .send()
+ .await
+ .unwrap();
+
+ // The multipart upload must not appear anymore
+ assert!(ctx
+ .client
+ .list_parts()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .send()
+ .await
+ .is_err());
+
+ {
+ // The object must appear as a regular object
+ let r = ctx
+ .client
+ .head_object()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(r.content_length, (SZ_5MB * 3) as i64);
+ }
+
+ {
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat());
+ }
+
+ {
+ for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] {
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key("a")
+ .part_number(part_number)
+ .send()
+ .await
+ .unwrap();
+
+ eprintln!("get_object with part_number = {}", part_number);
+ assert_eq!(o.content_length, SZ_5MB as i64);
+ assert_bytes_eq!(o.body, data);
+ }
+ }
+}
+
+#[tokio::test]
async fn test_uploadlistpart() {
let ctx = common::context();
let bucket = ctx.create_bucket("uploadpart");
@@ -65,7 +249,8 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 1);
- let fp = ps.iter().find(|x| x.part_number == 2).unwrap();
+ assert_eq!(ps[0].part_number, 2);
+ let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
@@ -100,13 +285,24 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 2);
- let fp = ps.iter().find(|x| x.part_number == 1).unwrap();
+
+ assert_eq!(ps[0].part_number, 1);
+ let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3c484266f9315485694556e6c693bfa2\""
);
assert_eq!(fp.size, SZ_5MB as i64);
+
+ assert_eq!(ps[1].part_number, 2);
+ let sp = &ps[1];
+ assert!(sp.last_modified.is_some());
+ assert_eq!(
+ sp.e_tag.as_ref().unwrap(),
+ "\"3366bb9dcf710d6801b5926467d02e19\""
+ );
+ assert_eq!(sp.size, SZ_5MB as i64);
}
{
@@ -123,12 +319,19 @@ async fn test_uploadlistpart() {
.unwrap();
assert!(r.part_number_marker.is_none());
- assert!(r.next_part_number_marker.is_some());
+ assert_eq!(r.next_part_number_marker.as_deref(), Some("1"));
assert_eq!(r.max_parts, 1_i32);
assert!(r.is_truncated);
assert_eq!(r.key.unwrap(), "a");
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
- assert_eq!(r.parts.unwrap().len(), 1);
+ let parts = r.parts.unwrap();
+ assert_eq!(parts.len(), 1);
+ let fp = &parts[0];
+ assert_eq!(fp.part_number, 1);
+ assert_eq!(
+ fp.e_tag.as_ref().unwrap(),
+ "\"3c484266f9315485694556e6c693bfa2\""
+ );
let r2 = ctx
.client
@@ -147,10 +350,18 @@ async fn test_uploadlistpart() {
r.next_part_number_marker.as_ref().unwrap()
);
assert_eq!(r2.max_parts, 1_i32);
- assert!(r2.is_truncated);
assert_eq!(r2.key.unwrap(), "a");
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
- assert_eq!(r2.parts.unwrap().len(), 1);
+ let parts = r2.parts.unwrap();
+ assert_eq!(parts.len(), 1);
+ let fp = &parts[0];
+ assert_eq!(fp.part_number, 2);
+ assert_eq!(
+ fp.e_tag.as_ref().unwrap(),
+ "\"3366bb9dcf710d6801b5926467d02e19\""
+ );
+ //assert!(r2.is_truncated); // WHY? (this was the test before)
+ assert!(!r2.is_truncated);
}
let cmp = CompletedMultipartUpload::builder()