aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin/block.rs106
-rw-r--r--src/garage/admin/bucket.rs10
-rw-r--r--src/garage/admin/key.rs21
-rw-r--r--src/garage/admin/mod.rs5
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/layout.rs226
-rw-r--r--src/garage/cli/structs.rs50
-rw-r--r--src/garage/cli/util.rs66
-rw-r--r--src/garage/main.rs3
-rw-r--r--src/garage/repair/online.rs232
-rw-r--r--src/garage/tests/common/garage.rs7
-rw-r--r--src/garage/tests/k2v/item.rs9
-rw-r--r--src/garage/tests/s3/multipart.rs223
14 files changed, 721 insertions, 247 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 35d87a3e..88729aaf 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -78,7 +78,7 @@ k2v-client.workspace = true
[features]
-default = [ "bundled-libs", "metrics", "sled", "k2v" ]
+default = [ "bundled-libs", "metrics", "sled", "lmdb", "sqlite", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
index e9e3ff96..c4a45738 100644
--- a/src/garage/admin/block.rs
+++ b/src/garage/admin/block.rs
@@ -34,6 +34,7 @@ impl AdminRpcHandler {
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
+ let mut uploads = vec![];
for br in block_refs {
if let Some(v) = self
.garage
@@ -41,6 +42,11 @@ impl AdminRpcHandler {
.get(&br.version, &EmptyKey)
.await?
{
+ if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
+ if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ uploads.push(u);
+ }
+ }
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
@@ -50,6 +56,7 @@ impl AdminRpcHandler {
hash,
refcount,
versions,
+ uploads,
})
}
@@ -93,6 +100,7 @@ impl AdminRpcHandler {
}
let mut obj_dels = 0;
+ let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
@@ -105,56 +113,80 @@ impl AdminRpcHandler {
.await?;
for br in block_refs {
- let version = match self
+ if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
- Some(v) => v,
- None => continue,
- };
+ self.handle_block_purge_version_backlink(
+ &version,
+ &mut obj_dels,
+ &mut mpu_dels,
+ )
+ .await?;
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
+ if !version.deleted.get() {
+ let deleted_version = Version::new(version.uuid, version.backlink, true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
}
}
-
- if !version.deleted.get() {
- let deleted_version =
- Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
}
}
+
Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(),
+ ver_dels,
obj_dels,
- ver_dels
+ mpu_dels,
)))
}
+
+ async fn handle_block_purge_version_backlink(
+ &self,
+ version: &Version,
+ obj_dels: &mut usize,
+ mpu_dels: &mut usize,
+ ) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ self.garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
index 11bb8730..0781cb8b 100644
--- a/src/garage/admin/bucket.rs
+++ b/src/garage/admin/bucket.rs
@@ -73,6 +73,15 @@ impl AdminRpcHandler {
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default();
+ let mpu_counters = self
+ .garage
+ .mpu_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -112,6 +121,7 @@ impl AdminRpcHandler {
bucket,
relevant_keys,
counters,
+ mpu_counters,
})
}
diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs
index cab13bcf..1c92670c 100644
--- a/src/garage/admin/key.rs
+++ b/src/garage/admin/key.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
use garage_table::*;
-use garage_model::helper::error::Error;
+use garage_model::helper::error::*;
use garage_model::key_table::*;
use crate::cli::*;
@@ -14,7 +14,7 @@ impl AdminRpcHandler {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
KeyOperation::Info(query) => self.handle_key_info(query).await,
- KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Create(query) => self.handle_create_key(query).await,
KeyOperation::Rename(query) => self.handle_rename_key(query).await,
KeyOperation::Delete(query) => self.handle_delete_key(query).await,
KeyOperation::Allow(query) => self.handle_allow_key(query).await,
@@ -41,12 +41,17 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyList(key_ids))
}
- async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
- let key = self
+ async fn handle_key_info(&self, query: &KeyInfoOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
.garage
.key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
+
+ if !query.show_secret {
+ key.state.as_option_mut().unwrap().secret_key = "(redacted)".into();
+ }
+
self.key_info_result(key).await
}
@@ -118,11 +123,17 @@ impl AdminRpcHandler {
}
async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ if !query.yes {
+ return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string()));
+ }
+
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
if prev_key.is_some() {
return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name)
+ .ok_or_bad_request("Invalid key format")?;
self.garage.key_table.insert(&imported_key).await?;
self.key_info_result(imported_key).await
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 2709f08a..b6f9c426 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -28,6 +28,7 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
+use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use crate::cli::*;
@@ -53,6 +54,7 @@ pub enum AdminRpc {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
+ mpu_counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
@@ -67,6 +69,7 @@ pub enum AdminRpc {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
+ uploads: Vec<MultipartUpload>,
},
}
@@ -274,7 +277,7 @@ impl AdminRpcHandler {
// Gather storage node and free space statistics
let layout = &self.garage.system.ring.borrow().layout;
let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.ring_assignation_data.iter() {
+ for short_id in layout.ring_assignment_data.iter() {
let id = layout.node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index cb7a898c..48359614 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -85,7 +85,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
_ => {
- let new_role = match layout.staging.get(&adv.id) {
+ let new_role = match layout.staging_roles.get(&adv.id) {
Some(NodeRoleV(Some(_))) => "(pending)",
_ => "NO ROLE ASSIGNED",
};
@@ -190,8 +190,9 @@ pub async fn cmd_admin(
bucket,
relevant_keys,
counters,
+ mpu_counters,
} => {
- print_bucket_info(&bucket, &relevant_keys, &counters);
+ print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
@@ -215,8 +216,9 @@ pub async fn cmd_admin(
hash,
refcount,
versions,
+ uploads,
} => {
- print_block_info(hash, refcount, versions);
+ print_block_info(hash, refcount, versions, uploads);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index dc5315a1..ce2b11e0 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,3 +1,5 @@
+use bytesize::ByteSize;
+
use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
@@ -14,8 +16,8 @@ pub async fn cli_layout_command_dispatch(
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- LayoutOperation::Assign(configure_opt) => {
- cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
+ LayoutOperation::Assign(assign_opt) => {
+ cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await
}
LayoutOperation::Remove(remove_opt) => {
cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
@@ -27,6 +29,9 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Revert(revert_opt) => {
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
}
+ LayoutOperation::Config(config_opt) => {
+ cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
+ }
}
}
@@ -60,14 +65,14 @@ pub async fn cmd_assign_role(
.collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -83,7 +88,7 @@ pub async fn cmd_assign_role(
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
- if args.capacity == Some(0) {
+ if args.capacity == Some(ByteSize::b(0)) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
@@ -91,7 +96,7 @@ pub async fn cmd_assign_role(
let new_entry = match roles.get(&added_node) {
Some(NodeRoleV(Some(old))) => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => old.capacity,
};
@@ -108,7 +113,7 @@ pub async fn cmd_assign_role(
}
_ => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
@@ -125,7 +130,7 @@ pub async fn cmd_assign_role(
};
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -145,13 +150,13 @@ pub async fn cmd_remove_role(
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -166,40 +171,45 @@ pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
- if !print_cluster_layout(&layout) {
- println!("No nodes currently have a role in the cluster.");
- println!("See `garage status` to view available nodes.");
- }
+ print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes.");
println!();
println!("Current cluster layout version: {}", layout.version);
- if print_staging_role_changes(&layout) {
- layout.roles.merge(&layout.staging);
-
- println!();
- println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- if !print_cluster_layout(&layout) {
- println!("No nodes have a role in the new layout.");
- }
- println!();
+ let has_role_changes = print_staging_role_changes(&layout);
+ if has_role_changes {
+ let v = layout.version;
+ let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
// will move around when we apply
- if layout.calculate_partition_assignation() {
- println!("To enact the staged role changes, type:");
- println!();
- println!(" garage layout apply --version {}", layout.version + 1);
- println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- layout.version + 1
- );
- } else {
- println!("Not enough nodes have an assigned role to maintain enough copies of data.");
- println!("This new layout cannot yet be applied.");
+ match res_apply {
+ Ok((layout, msg)) => {
+ println!();
+ println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
+ print_cluster_layout(&layout, "No nodes have a role in the new layout.");
+ println!();
+
+ for line in msg.iter() {
+ println!("{}", line);
+ }
+ println!("To enact the staged role changes, type:");
+ println!();
+ println!(" garage layout apply --version {}", v + 1);
+ println!();
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
+ Err(e) => {
+ println!("Error while trying to compute the assignment: {}", e);
+ println!("This new layout cannot yet be applied.");
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
}
}
@@ -213,11 +223,14 @@ pub async fn cmd_apply_layout(
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.apply_staged_changes(apply_opt.version)?;
+ let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
+ for line in msg.iter() {
+ println!("{}", line);
+ }
send_layout(rpc_cli, rpc_host, layout).await?;
- println!("New cluster layout with updated role assignation has been applied in cluster.");
+ println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
@@ -238,6 +251,52 @@ pub async fn cmd_revert_layout(
Ok(())
}
+pub async fn cmd_config_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ config_opt: ConfigLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut did_something = false;
+ match config_opt.redundancy {
+ None => (),
+ Some(r_str) => {
+ let r = r_str
+ .parse::<ZoneRedundancy>()
+ .ok_or_message("invalid zone redundancy value")?;
+ if let ZoneRedundancy::AtLeast(r_int) = r {
+ if r_int > layout.replication_factor {
+ return Err(Error::Message(format!(
+ "The zone redundancy must be smaller or equal to the \
+ replication factor ({}).",
+ layout.replication_factor
+ )));
+ } else if r_int < 1 {
+ return Err(Error::Message(
+ "The zone redundancy must be at least 1.".into(),
+ ));
+ }
+ }
+
+ layout
+ .staging_parameters
+ .update(LayoutParameters { zone_redundancy: r });
+ println!("The zone redundancy parameter has been set to '{}'.", r);
+ did_something = true;
+ }
+ }
+
+ if !did_something {
+ return Err(Error::Message(
+ "Please specify an action for `garage layout config`".into(),
+ ));
+ }
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ Ok(())
+}
+
// --- utility ---
pub async fn fetch_layout(
@@ -268,59 +327,84 @@ pub async fn send_layout(
Ok(())
}
-pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
- let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) {
+ let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
Some(r) => r,
_ => continue,
};
let tags = role.tags.join(",");
- table.push(format!(
- "{:?}\t{}\t{}\t{}",
- id,
- tags,
- role.zone,
- role.capacity_string()
- ));
+ let usage = layout.get_node_usage(id).unwrap_or(0);
+ let capacity = layout.get_node_capacity(id).unwrap_or(0);
+ if capacity > 0 {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string(),
+ ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
+ (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
+ ));
+ } else {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ };
}
- if table.len() == 1 {
- false
- } else {
+ if table.len() > 1 {
format_table(table);
- true
+ println!();
+ println!("Zone redundancy: {}", layout.parameters.zone_redundancy);
+ } else {
+ println!("{}", empty_msg);
}
}
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- let has_changes = layout
- .staging
+ let has_role_changes = layout
+ .staging_roles
.items()
.iter()
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
+ let has_layout_changes = *layout.staging_parameters.get() != layout.parameters;
- if has_changes {
+ if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
- let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging.items().iter() {
- if layout.roles.get(id) == Some(role) {
- continue;
- }
- if let Some(role) = &role.0 {
- let tags = role.tags.join(",");
- table.push(format!(
- "{:?}\t{}\t{}\t{}",
- id,
- tags,
- role.zone,
- role.capacity_string()
- ));
- } else {
- table.push(format!("{:?}\tREMOVED", id));
+ if has_role_changes {
+ let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ for (id, _, role) in layout.staging_roles.items().iter() {
+ if layout.roles.get(id) == Some(role) {
+ continue;
+ }
+ if let Some(role) = &role.0 {
+ let tags = role.tags.join(",");
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ } else {
+ table.push(format!("{:?}\tREMOVED", id));
+ }
}
+ format_table(table);
+ println!();
+ }
+ if has_layout_changes {
+ println!(
+ "Zone redundancy: {}",
+ layout.staging_parameters.get().zone_redundancy
+ );
}
- format_table(table);
true
} else {
false
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 6e585e53..c4ebeb1a 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -17,7 +17,7 @@ pub enum Command {
#[structopt(name = "node", version = garage_version())]
Node(NodeOperation),
- /// Operations on the assignation of node roles in the cluster layout
+ /// Operations on the assignment of node roles in the cluster layout
#[structopt(name = "layout", version = garage_version())]
Layout(LayoutOperation),
@@ -91,6 +91,10 @@ pub enum LayoutOperation {
#[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt),
+ /// Configure parameters value for the layout computation
+ #[structopt(name = "config", version = garage_version())]
+ Config(ConfigLayoutOpt),
+
/// Show roles currently assigned to nodes and changes staged for commit
#[structopt(name = "show", version = garage_version())]
Show,
@@ -114,9 +118,9 @@ pub struct AssignRoleOpt {
#[structopt(short = "z", long = "zone")]
pub(crate) zone: Option<String>,
- /// Capacity (in relative terms, use 1 to represent your smallest server)
+ /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB)
#[structopt(short = "c", long = "capacity")]
- pub(crate) capacity: Option<u32>,
+ pub(crate) capacity: Option<bytesize::ByteSize>,
/// Gateway-only node
#[structopt(short = "g", long = "gateway")]
@@ -138,6 +142,13 @@ pub struct RemoveRoleOpt {
}
#[derive(StructOpt, Debug)]
+pub struct ConfigLayoutOpt {
+ /// Zone redundancy parameter ('none'/'max' or integer)
+ #[structopt(short = "r", long = "redundancy")]
+ pub(crate) redundancy: Option<String>,
+}
+
+#[derive(StructOpt, Debug)]
pub struct ApplyLayoutOpt {
/// Version number of new configuration: this command will fail if
/// it is not exactly 1 + the previous configuration's version
@@ -317,11 +328,11 @@ pub enum KeyOperation {
/// Get key info
#[structopt(name = "info", version = garage_version())]
- Info(KeyOpt),
+ Info(KeyInfoOpt),
/// Create new key
- #[structopt(name = "new", version = garage_version())]
- New(KeyNewOpt),
+ #[structopt(name = "create", version = garage_version())]
+ Create(KeyNewOpt),
/// Rename key
#[structopt(name = "rename", version = garage_version())]
@@ -345,15 +356,18 @@ pub enum KeyOperation {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
-pub struct KeyOpt {
+pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
+ /// Whether to display the secret key
+ #[structopt(long = "show-secret")]
+ pub show_secret: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
- #[structopt(long = "name", default_value = "Unnamed key")]
+ #[structopt(default_value = "Unnamed key")]
pub name: String,
}
@@ -397,6 +411,10 @@ pub struct KeyImportOpt {
/// Key name
#[structopt(short = "n", default_value = "Imported key")]
pub name: String,
+
+ /// Confirm key import
+ #[structopt(long = "yes")]
+ pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
@@ -432,24 +450,30 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
- /// Only do a full sync of metadata tables
+ /// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
Tables,
- /// Only repair (resync/rebalance) the set of stored blocks
+ /// Repair (resync/rebalance) the set of stored blocks
#[structopt(name = "blocks", version = garage_version())]
Blocks,
- /// Only redo the propagation of object deletions to the version table (slow)
+ /// Repropagate object deletions to the version table
#[structopt(name = "versions", version = garage_version())]
Versions,
- /// Only redo the propagation of version deletions to the block ref table (extremely slow)
+ /// Repropagate object deletions to the multipart upload table
+ #[structopt(name = "mpu", version = garage_version())]
+ MultipartUploads,
+ /// Repropagate version deletions to the block ref table
#[structopt(name = "block_refs", version = garage_version())]
BlockRefs,
- /// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
+ /// Verify integrity of all blocks on disc
#[structopt(name = "scrub", version = garage_version())]
Scrub {
#[structopt(subcommand)]
cmd: ScrubCmd,
},
+ /// Rebalance data blocks among storage locations
+ #[structopt(name = "rebalance", version = garage_version())]
+ Rebalance,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 1140cf22..2232d395 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
-use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
-use garage_model::s3::version_table::Version;
+use garage_model::s3::mpu_table::{self, MultipartUpload};
+use garage_model::s3::object_table;
+use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt;
@@ -135,6 +136,7 @@ pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
+ mpu_counters: &HashMap<String, i64>,
) {
let key_name = |k| {
relevant_keys
@@ -148,7 +150,7 @@ pub fn print_bucket_info(
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
let size =
- bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
@@ -156,14 +158,22 @@ pub fn print_bucket_info(
);
println!(
"Objects: {}",
- counters.get(OBJECTS).cloned().unwrap_or_default()
+ *counters.get(object_table::OBJECTS).unwrap_or(&0)
+ );
+ println!(
+ "Unfinished uploads (multipart and non-multipart): {}",
+ *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0)
);
println!(
"Unfinished multipart uploads: {}",
- counters
- .get(UNFINISHED_UPLOADS)
- .cloned()
- .unwrap_or_default()
+ *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0)
+ );
+ let mpu_size =
+ bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64);
+ println!(
+ "Size of unfinished multipart uploads: {} ({})",
+ mpu_size.to_string_as(true),
+ mpu_size.to_string_as(false),
);
println!("\nWebsite access: {}", p.website_config.get().is_some());
@@ -390,29 +400,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
format_table(table);
}
-pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+pub fn print_block_info(
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ uploads: Vec<MultipartUpload>,
+) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
- let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
- table.push(format!(
- "{:?}\t{:?}\t{}\t{:?}",
- ver.uuid,
- ver.bucket_id,
- ver.key,
- ver.deleted.get()
- ));
+ match &ver.backlink {
+ VersionBacklink::Object { bucket_id, key } => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t\t{:?}",
+ ver.uuid,
+ bucket_id,
+ key,
+ ver.deleted.get()
+ ));
+ }
+ VersionBacklink::MultipartUpload { upload_id } => {
+ let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}\t{:?}",
+ ver.uuid,
+ upload.map(|u| u.bucket_id).unwrap_or_default(),
+ upload.map(|u| u.key.as_str()).unwrap_or_default(),
+ upload_id,
+ ver.deleted.get()
+ ));
+ }
+ }
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
- table.push(format!("{:?}\t\t\tyes", vh));
+ table.push(format!("{:?}\t\t\t\tyes", vh));
}
}
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e8aee892..3d07208c 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -17,6 +17,9 @@ compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled
#[cfg(all(feature = "bundled-libs", feature = "system-libs"))]
compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled");
+#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
+compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
+
use std::net::SocketAddr;
use std::path::PathBuf;
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 0e14ed51..9e4de873 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -5,11 +5,16 @@ use async_trait::async_trait;
use tokio::sync::watch;
use garage_block::repair::ScrubWorkerCommand;
+
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
+use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
+
+use garage_table::replication::*;
use garage_table::*;
+
use garage_util::background::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
@@ -32,11 +37,15 @@ pub async fn launch_online_repair(
}
RepairWhat::Versions => {
info!("Repairing the versions table");
- bg.spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
+ }
+ RepairWhat::MultipartUploads => {
+ info!("Repairing the multipart uploads table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
- bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
@@ -61,76 +70,82 @@ pub async fn launch_online_repair(
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
+ RepairWhat::Rebalance => {
+ info!("Rebalancing the stored blocks among storage locations");
+ bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
}
Ok(())
}
// ----
-struct RepairVersionsWorker {
+#[async_trait]
+trait TableRepair: Send + Sync + 'static {
+ type T: TableSchema;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
+
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ entry: <<Self as TableRepair>::T as TableSchema>::E,
+ ) -> Result<bool, Error>;
+}
+
+struct TableRepairWorker<T: TableRepair> {
garage: Arc<Garage>,
pos: Vec<u8>,
counter: usize,
+ repairs: usize,
+ inner: T,
}
-impl RepairVersionsWorker {
- fn new(garage: Arc<Garage>) -> Self {
+impl<R: TableRepair> TableRepairWorker<R> {
+ fn new(garage: Arc<Garage>, inner: R) -> Self {
Self {
garage,
+ inner,
pos: vec![],
counter: 0,
+ repairs: 0,
}
}
}
#[async_trait]
-impl Worker for RepairVersionsWorker {
+impl<R: TableRepair> Worker for TableRepairWorker<R> {
fn name(&self) -> String {
- "Version repair worker".into()
+ format!("{} repair worker", R::T::TABLE_NAME)
}
fn status(&self) -> WorkerStatus {
WorkerStatus {
- progress: Some(self.counter.to_string()),
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
- info!("repair_versions: finished, done {}", self.counter);
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
return Ok(WorkerState::Done);
}
};
- let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
- if !version.deleted.get() {
- let object = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?;
- let version_exists = match object {
- Some(o) => o
- .versions()
- .iter()
- .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => false,
- };
- if !version_exists {
- info!("Repair versions: marking version as deleted: {:?}", version);
- self.garage
- .version_table
- .insert(&Version::new(
- version.uuid,
- version.bucket_id,
- version.key,
- true,
- ))
- .await?;
- }
+ let entry = <R::T as TableSchema>::E::decode(&item_bytes)
+ .ok_or_message("Cannot decode table entry")?;
+ if self.inner.process(&self.garage, entry).await? {
+ self.repairs += 1;
}
self.counter += 1;
@@ -146,77 +161,124 @@ impl Worker for RepairVersionsWorker {
// ----
-struct RepairBlockrefsWorker {
- garage: Arc<Garage>,
- pos: Vec<u8>,
- counter: usize,
-}
+struct RepairVersions;
-impl RepairBlockrefsWorker {
- fn new(garage: Arc<Garage>) -> Self {
- Self {
- garage,
- pos: vec![],
- counter: 0,
+#[async_trait]
+impl TableRepair for RepairVersions {
+ type T = VersionTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.version_table
+ }
+
+ async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
+ if !version.deleted.get() {
+ let ref_exists = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => garage
+ .object_table
+ .get(bucket_id, key)
+ .await?
+ .map(|o| {
+ o.versions().iter().any(|x| {
+ x.uuid == version.uuid && x.state != ObjectVersionState::Aborted
+ })
+ })
+ .unwrap_or(false),
+ VersionBacklink::MultipartUpload { upload_id } => garage
+ .mpu_table
+ .get(upload_id, &EmptyKey)
+ .await?
+ .map(|u| !u.deleted.get())
+ .unwrap_or(false),
+ };
+
+ if !ref_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ garage
+ .version_table
+ .insert(&Version::new(version.uuid, version.backlink, true))
+ .await?;
+ return Ok(true);
+ }
}
+
+ Ok(false)
}
}
+// ----
+
+struct RepairBlockRefs;
+
#[async_trait]
-impl Worker for RepairBlockrefsWorker {
- fn name(&self) -> String {
- "Block refs repair worker".into()
- }
+impl TableRepair for RepairBlockRefs {
+ type T = BlockRefTable;
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- progress: Some(self.counter.to_string()),
- ..Default::default()
- }
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.block_ref_table
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) =
- match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => (v, k),
- None => {
- info!("repair_block_ref: finished, done {}", self.counter);
- return Ok(WorkerState::Done);
- }
- };
-
- let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
+ async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
if !block_ref.deleted.get() {
- let version = self
- .garage
+ let ref_exists = garage
.version_table
.get(&block_ref.version, &EmptyKey)
- .await?;
- // The version might not exist if it has been GC'ed
- let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
+ .await?
+ .map(|v| !v.deleted.get())
+ .unwrap_or(false);
+
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
block_ref
);
- self.garage
- .block_ref_table
- .insert(&BlockRef {
- block: block_ref.block,
- version: block_ref.version,
- deleted: true.into(),
- })
- .await?;
+ block_ref.deleted.set();
+ garage.block_ref_table.insert(&block_ref).await?;
+ return Ok(true);
}
}
- self.counter += 1;
- self.pos = next_pos;
+ Ok(false)
+ }
+}
- Ok(WorkerState::Busy)
+// ----
+
+struct RepairMpu;
+
+#[async_trait]
+impl TableRepair for RepairMpu {
+ type T = MultipartUploadTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.mpu_table
}
- async fn wait_for_work(&mut self) -> WorkerState {
- unreachable!()
+ async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
+ if !mpu.deleted.get() {
+ let ref_exists = garage
+ .object_table
+ .get(&mpu.bucket_id, &mpu.key)
+ .await?
+ .map(|o| {
+ o.versions()
+ .iter()
+ .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true)))
+ })
+ .unwrap_or(false);
+
+ if !ref_exists {
+ info!(
+ "Repair multipart uploads: marking mpu as deleted: {:?}",
+ mpu
+ );
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
}
}
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index 8aaf6f5b..d1f0867a 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -52,6 +52,7 @@ impl Instance {
r#"
metadata_dir = "{path}/meta"
data_dir = "{path}/data"
+db_engine = "lmdb"
replication_mode = "1"
@@ -141,7 +142,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
self.command()
.args(["layout", "assign"])
.arg(node_short_id)
- .args(["-c", "1", "-z", "unzonned"])
+ .args(["-c", "1G", "-z", "unzonned"])
.quiet()
.expect_success_status("Could not assign garage node layout");
self.command()
@@ -186,9 +187,9 @@ api_bind_addr = "127.0.0.1:{admin_port}"
let mut key = Key::default();
let mut cmd = self.command();
- let base = cmd.args(["key", "new"]);
+ let base = cmd.args(["key", "create"]);
let with_name = match maybe_name {
- Some(name) => base.args(["--name", name]),
+ Some(name) => base.args([name]),
None => base,
};
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 25d9cce4..20add889 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -44,6 +44,7 @@ async fn test_items_and_indices() {
let content = format!("{}: hello world", sk).into_bytes();
let content2 = format!("{}: hello universe", sk).into_bytes();
let content3 = format!("{}: concurrent value", sk).into_bytes();
+ eprintln!("test iteration {}: {}", i, sk);
// Put initially, no causality token
let res = ctx
@@ -89,7 +90,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -158,7 +159,7 @@ async fn test_items_and_indices() {
assert_eq!(res_body, content2);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -230,7 +231,7 @@ async fn test_items_and_indices() {
);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
@@ -299,7 +300,7 @@ async fn test_items_and_indices() {
assert_eq!(res.status(), StatusCode::NO_CONTENT);
// ReadIndex -- now there should be some stuff
- tokio::time::sleep(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_millis(100)).await;
let res = ctx
.k2v
.request
diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs
index aeff94b4..09ae5e5b 100644
--- a/src/garage/tests/s3/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
@@ -6,6 +6,190 @@ const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;
#[tokio::test]
+async fn test_multipart_upload() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("testmpu");
+
+ let u1 = vec![0x11; SZ_5MB];
+ let u2 = vec![0x22; SZ_5MB];
+ let u3 = vec![0x33; SZ_5MB];
+ let u4 = vec![0x44; SZ_5MB];
+ let u5 = vec![0x55; SZ_5MB];
+
+ let up = ctx
+ .client
+ .create_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+ assert!(up.upload_id.is_some());
+
+ let uid = up.upload_id.as_ref().unwrap();
+
+ let p3 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(3)
+ .body(ByteStream::from(u3.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let _p1 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(1)
+ .body(ByteStream::from(u1))
+ .send()
+ .await
+ .unwrap();
+
+ let _p4 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(4)
+ .body(ByteStream::from(u4))
+ .send()
+ .await
+ .unwrap();
+
+ let p1bis = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(1)
+ .body(ByteStream::from(u2.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ let p6 = ctx
+ .client
+ .upload_part()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .part_number(6)
+ .body(ByteStream::from(u5.clone()))
+ .send()
+ .await
+ .unwrap();
+
+ {
+ let r = ctx
+ .client
+ .list_parts()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(r.parts.unwrap().len(), 4);
+ }
+
+ let cmp = CompletedMultipartUpload::builder()
+ .parts(
+ CompletedPart::builder()
+ .part_number(1)
+ .e_tag(p1bis.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(3)
+ .e_tag(p3.e_tag.unwrap())
+ .build(),
+ )
+ .parts(
+ CompletedPart::builder()
+ .part_number(6)
+ .e_tag(p6.e_tag.unwrap())
+ .build(),
+ )
+ .build();
+
+ ctx.client
+ .complete_multipart_upload()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .multipart_upload(cmp)
+ .send()
+ .await
+ .unwrap();
+
+ // The multipart upload must not appear anymore
+ assert!(ctx
+ .client
+ .list_parts()
+ .bucket(&bucket)
+ .key("a")
+ .upload_id(uid)
+ .send()
+ .await
+ .is_err());
+
+ {
+ // The object must appear as a regular object
+ let r = ctx
+ .client
+ .head_object()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+
+ assert_eq!(r.content_length, (SZ_5MB * 3) as i64);
+ }
+
+ {
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key("a")
+ .send()
+ .await
+ .unwrap();
+
+ assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat());
+ }
+
+ {
+ for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] {
+ let o = ctx
+ .client
+ .get_object()
+ .bucket(&bucket)
+ .key("a")
+ .part_number(part_number)
+ .send()
+ .await
+ .unwrap();
+
+ eprintln!("get_object with part_number = {}", part_number);
+ assert_eq!(o.content_length, SZ_5MB as i64);
+ assert_bytes_eq!(o.body, data);
+ }
+ }
+}
+
+#[tokio::test]
async fn test_uploadlistpart() {
let ctx = common::context();
let bucket = ctx.create_bucket("uploadpart");
@@ -65,7 +249,8 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 1);
- let fp = ps.iter().find(|x| x.part_number == 2).unwrap();
+ assert_eq!(ps[0].part_number, 2);
+ let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
@@ -100,13 +285,24 @@ async fn test_uploadlistpart() {
let ps = r.parts.unwrap();
assert_eq!(ps.len(), 2);
- let fp = ps.iter().find(|x| x.part_number == 1).unwrap();
+
+ assert_eq!(ps[0].part_number, 1);
+ let fp = &ps[0];
assert!(fp.last_modified.is_some());
assert_eq!(
fp.e_tag.as_ref().unwrap(),
"\"3c484266f9315485694556e6c693bfa2\""
);
assert_eq!(fp.size, SZ_5MB as i64);
+
+ assert_eq!(ps[1].part_number, 2);
+ let sp = &ps[1];
+ assert!(sp.last_modified.is_some());
+ assert_eq!(
+ sp.e_tag.as_ref().unwrap(),
+ "\"3366bb9dcf710d6801b5926467d02e19\""
+ );
+ assert_eq!(sp.size, SZ_5MB as i64);
}
{
@@ -123,12 +319,19 @@ async fn test_uploadlistpart() {
.unwrap();
assert!(r.part_number_marker.is_none());
- assert!(r.next_part_number_marker.is_some());
+ assert_eq!(r.next_part_number_marker.as_deref(), Some("1"));
assert_eq!(r.max_parts, 1_i32);
assert!(r.is_truncated);
assert_eq!(r.key.unwrap(), "a");
assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str());
- assert_eq!(r.parts.unwrap().len(), 1);
+ let parts = r.parts.unwrap();
+ assert_eq!(parts.len(), 1);
+ let fp = &parts[0];
+ assert_eq!(fp.part_number, 1);
+ assert_eq!(
+ fp.e_tag.as_ref().unwrap(),
+ "\"3c484266f9315485694556e6c693bfa2\""
+ );
let r2 = ctx
.client
@@ -147,10 +350,18 @@ async fn test_uploadlistpart() {
r.next_part_number_marker.as_ref().unwrap()
);
assert_eq!(r2.max_parts, 1_i32);
- assert!(r2.is_truncated);
assert_eq!(r2.key.unwrap(), "a");
assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str());
- assert_eq!(r2.parts.unwrap().len(), 1);
+ let parts = r2.parts.unwrap();
+ assert_eq!(parts.len(), 1);
+ let fp = &parts[0];
+ assert_eq!(fp.part_number, 2);
+ assert_eq!(
+ fp.e_tag.as_ref().unwrap(),
+ "\"3366bb9dcf710d6801b5926467d02e19\""
+ );
+ //assert!(r2.is_truncated); // WHY? (this was the test before)
+ assert!(!r2.is_truncated);
}
let cmp = CompletedMultipartUpload::builder()