aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/garage
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml66
-rw-r--r--src/garage/admin.rs269
-rw-r--r--src/garage/cli/cmd.rs30
-rw-r--r--src/garage/cli/layout.rs79
-rw-r--r--src/garage/cli/structs.rs202
-rw-r--r--src/garage/cli/util.rs137
-rw-r--r--src/garage/main.rs78
-rw-r--r--src/garage/repair.rs149
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/repair/offline.rs55
-rw-r--r--src/garage/repair/online.rs215
-rw-r--r--src/garage/server.rs188
-rw-r--r--src/garage/tests/bucket.rs8
-rw-r--r--src/garage/tests/common/client.rs2
-rw-r--r--src/garage/tests/common/custom_requester.rs55
-rw-r--r--src/garage/tests/common/garage.rs34
-rw-r--r--src/garage/tests/common/mod.rs11
-rw-r--r--src/garage/tests/k2v/batch.rs612
-rw-r--r--src/garage/tests/k2v/errorcodes.rs141
-rw-r--r--src/garage/tests/k2v/item.rs725
-rw-r--r--src/garage/tests/k2v/mod.rs18
-rw-r--r--src/garage/tests/k2v/poll.rs98
-rw-r--r--src/garage/tests/k2v/simple.rs40
-rw-r--r--src/garage/tests/lib.rs11
-rw-r--r--src/garage/tests/s3/list.rs (renamed from src/garage/tests/list.rs)0
-rw-r--r--src/garage/tests/s3/mod.rs6
-rw-r--r--src/garage/tests/s3/multipart.rs (renamed from src/garage/tests/multipart.rs)0
-rw-r--r--src/garage/tests/s3/objects.rs (renamed from src/garage/tests/objects.rs)9
-rw-r--r--src/garage/tests/s3/simple.rs (renamed from src/garage/tests/simple.rs)0
-rw-r--r--src/garage/tests/s3/streaming_signature.rs (renamed from src/garage/tests/streaming_signature.rs)0
-rw-r--r--src/garage/tests/s3/website.rs (renamed from src/garage/tests/website.rs)32
-rw-r--r--src/garage/tracing_setup.rs37
32 files changed, 2811 insertions, 498 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 59f402ff..5ce40ff2 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.7.0"
+version = "0.8.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -21,25 +21,25 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.7.0", path = "../api" }
-garage_model = { version = "0.7.0", path = "../model" }
-garage_rpc = { version = "0.7.0", path = "../rpc" }
-garage_table = { version = "0.7.0", path = "../table" }
-garage_util = { version = "0.7.0", path = "../util" }
-garage_web = { version = "0.7.0", path = "../web" }
-garage_admin = { version = "0.7.0", path = "../admin" }
+garage_db = { version = "0.8.0", path = "../db" }
+garage_api = { version = "0.8.0", path = "../api" }
+garage_block = { version = "0.8.0", path = "../block" }
+garage_model = { version = "0.8.0", path = "../model" }
+garage_rpc = { version = "0.8.0", path = "../rpc" }
+garage_table = { version = "0.8.0", path = "../table" }
+garage_util = { version = "0.8.0", path = "../util" }
+garage_web = { version = "0.8.0", path = "../web" }
bytes = "1.0"
-git-version = "0.3.4"
+bytesize = "1.1"
+timeago = "0.3"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
-pretty_env_logger = "0.4"
+tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
@@ -50,16 +50,48 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+netapp = "0.5"
+
+opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
+opentelemetry-prometheus = { version = "0.10", optional = true }
+opentelemetry-otlp = { version = "0.10", optional = true }
+prometheus = { version = "0.13", optional = true }
[dev-dependencies]
aws-sdk-s3 = "0.8"
chrono = "0.4"
http = "0.2"
-hmac = "0.10"
+hmac = "0.12"
hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
-sha2 = "0.9"
+sha2 = "0.10"
static_init = "1.0"
+assert-json-diff = "2.0"
+serde_json = "1.0"
+base64 = "0.13"
+
+
+[features]
+default = [ "bundled-libs", "metrics", "sled" ]
+
+k2v = [ "garage_util/k2v", "garage_api/k2v" ]
+
+# Database engines, Sled is still our default even though we don't like it
+sled = [ "garage_model/sled" ]
+lmdb = [ "garage_model/lmdb" ]
+sqlite = [ "garage_model/sqlite" ]
+
+# Automatic registration and discovery via Kubernetes API
+kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
+# Prometheus exporter (/metrics endpoint).
+metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ]
+# Exporter for the OpenTelemetry Collector.
+telemetry-otlp = [ "opentelemetry-otlp" ]
+
+# NOTE: bundled-libs and system-libs should be treat as mutually exclusive;
+# exactly one of them should be enabled.
+
+# Use bundled libsqlite instead of linking against system-provided.
+bundled-libs = [ "garage_db/bundled-libs" ]
+# Link against system-provided libsodium and libzstd.
+system-libs = [ "garage_block/system-libs", "garage_rpc/system-libs", "sodiumoxide/use-pkg-config" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 0b20bb20..802a8261 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,34 +15,45 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::repair::ScrubWorkerCommand;
+
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
-use garage_model::object_table::ObjectFilter;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::Repair;
+use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
+ Worker(WorkerOpt),
// Replies
Ok(String),
BucketList(Vec<Bucket>),
- BucketInfo(Bucket, HashMap<String, Key>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
}
impl Rpc for AdminRpc {
@@ -73,6 +84,7 @@ impl AdminRpcHandler {
BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
}
}
@@ -80,8 +92,15 @@ impl AdminRpcHandler {
let buckets = self
.garage
.bucket_table
- .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
.await?;
+
Ok(AdminRpc::BucketList(buckets))
}
@@ -99,6 +118,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id)
.await?;
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -134,7 +162,11 @@ impl AdminRpcHandler {
}
}
- Ok(AdminRpc::BucketInfo(bucket, relevant_keys))
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
}
#[allow(clippy::ptr_arg)]
@@ -207,12 +239,7 @@ impl AdminRpcHandler {
}
// Check bucket is empty
- let objects = self
- .garage
- .object_table
- .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
- .await?;
- if !objects.is_empty() {
+ if !helper.is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
@@ -249,6 +276,7 @@ impl AdminRpcHandler {
async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.existing_bucket)
@@ -256,7 +284,7 @@ impl AdminRpcHandler {
.ok_or_bad_request("Bucket not found")?;
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
helper
.set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
@@ -278,9 +306,10 @@ impl AdminRpcHandler {
async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
if let Some(key_pattern) = &query.local {
- let key = helper.get_existing_matching_key(key_pattern).await?;
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
let bucket_id = key
.state
@@ -319,12 +348,15 @@ impl AdminRpcHandler {
async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = query.read || key.allow_read(&bucket_id);
let allow_write = query.write || key.allow_write(&bucket_id);
@@ -351,12 +383,15 @@ impl AdminRpcHandler {
async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
let bucket_id = helper
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
- let key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
let allow_read = !query.read && key.allow_read(&bucket_id);
let allow_write = !query.write && key.allow_write(&bucket_id);
@@ -423,6 +458,60 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(msg))
}
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -445,6 +534,7 @@ impl AdminRpcHandler {
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
10000,
+ EnumerationOrder::Forward,
)
.await?
.iter()
@@ -456,7 +546,7 @@ impl AdminRpcHandler {
async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
let key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
self.key_info_result(key).await
@@ -471,7 +561,7 @@ impl AdminRpcHandler {
async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
key.params_mut()
@@ -483,9 +573,11 @@ impl AdminRpcHandler {
}
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
- let mut key = helper.get_existing_matching_key(&query.key_pattern).await?;
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
if !query.yes {
return Err(Error::BadRequest(
@@ -493,32 +585,7 @@ impl AdminRpcHandler {
));
}
- let state = key.state.as_option_mut().unwrap();
-
- // --- done checking, now commit ---
- // (the step at unset_local_bucket_alias will fail if a bucket
- // does not have another alias, the deletion will be
- // interrupted in the middle if that happens)
-
- // 1. Delete local aliases
- for (alias, _, to) in state.local_aliases.items().iter() {
- if let Some(bucket_id) = to {
- helper
- .unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
- .await?;
- }
- }
-
- // 2. Remove permissions on all authorized buckets
- for (ab_id, _auth) in state.authorized_buckets.items().iter() {
- helper
- .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 3. Actually delete key
- key.state = Deletable::delete();
- self.garage.key_table.insert(&key).await?;
+ key_helper.delete_key(&mut key).await?;
Ok(AdminRpc::Ok(format!(
"Key {} was deleted successfully.",
@@ -529,7 +596,7 @@ impl AdminRpcHandler {
async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -542,7 +609,7 @@ impl AdminRpcHandler {
async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
- .bucket_helper()
+ .key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
@@ -616,7 +683,7 @@ impl AdminRpcHandler {
.endpoint
.call(
&node,
- &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
@@ -633,15 +700,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = Repair {
- garage: self.garage.clone(),
- };
- self.garage
- .system
- .background
- .spawn_worker("Repair worker".into(), move |must_exit| async move {
- repair.repair_worker(opt, must_exit).await
- });
+ launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -664,7 +723,7 @@ impl AdminRpcHandler {
let node_id = (*node).into();
match self
.endpoint
- .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL)
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await?
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
@@ -674,22 +733,22 @@ impl AdminRpcHandler {
}
Ok(AdminRpc::Ok(ret))
} else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)))
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
- fn gather_stats_local(&self, opt: StatsOpt) -> String {
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
- "\nGarage version: {}",
- option_env!("GIT_VERSION").unwrap_or(git_version::git_version!(
- prefix = "git:",
- cargo_prefix = "cargo:",
- fallback = "unknown"
- ))
+ "\nGarage version: {} [features: {}]",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
)
.unwrap();
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics
let ring = self.garage.system.ring.borrow().clone();
@@ -707,59 +766,108 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt);
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt);
+ self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
+ self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
if opt.detailed {
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()
+ self.garage.block_manager.rc_len()?
)
.unwrap();
}
writeln!(
&mut ret,
" resync queue length: {}",
- self.garage.block_manager.resync_queue_len()
+ self.garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
- self.garage.block_manager.resync_errors_len()
+ self.garage.block_manager.resync.errors_len()?
)
.unwrap();
- ret
+ Ok(ret)
}
- fn gather_table_stats<F, R>(&self, to: &mut String, t: &Arc<Table<F, R>>, opt: &StatsOpt)
+ fn gather_table_stats<F, R>(
+ &self,
+ to: &mut String,
+ t: &Arc<Table<F, R>>,
+ opt: &StatsOpt,
+ ) -> Result<(), Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
if opt.detailed {
- writeln!(to, " number of items: {}", t.data.store.len()).unwrap();
+ writeln!(
+ to,
+ " number of items: {}",
+ t.data.store.len().map_err(GarageError::from)?
+ )
+ .unwrap();
writeln!(
to,
" Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()
+ t.merkle_updater.merkle_tree_len()?
)
.unwrap();
}
writeln!(
to,
" Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()
+ t.merkle_updater.todo_len()?
)
.unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+
+ Ok(())
+ }
+
+ // ----
+
+ async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
+ match opt.cmd {
+ WorkerCmd::List { opt } => {
+ let workers = self.garage.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, opt))
+ }
+ WorkerCmd::Set { opt } => match opt {
+ WorkerSetCmd::ScrubTranquility { tranquility } => {
+ let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ self.garage
+ .block_manager
+ .send_scrub_command(scrub_command)
+ .await;
+ Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
+ }
+ WorkerSetCmd::ResyncNWorkers { n_workers } => {
+ self.garage
+ .block_manager
+ .resync
+ .set_n_workers(n_workers)
+ .await?;
+ Ok(AdminRpc::Ok("Number of resync workers updated".into()))
+ }
+ WorkerSetCmd::ResyncTranquility { tranquility } => {
+ self.garage
+ .block_manager
+ .resync
+ .set_tranquility(tranquility)
+ .await?;
+ Ok(AdminRpc::Ok("Resync tranquility updated".into()))
+ }
+ },
+ }
}
}
@@ -776,6 +884,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a90277a0..c8b96489 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,6 +1,8 @@
use std::collections::HashSet;
+use std::time::Duration;
use garage_util::error::*;
+use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
@@ -38,13 +40,14 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(),
}
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -85,19 +88,21 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|adv| !adv.is_up);
+ let failure_case_1 = status
+ .iter()
+ .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_)))));
let failure_case_2 = layout
.roles
.items()
.iter()
- .filter(|(_, _, v)| v.0.is_some())
- .any(|(id, _, _)| !status_keys.contains(id));
+ .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some());
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
+ let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@@ -108,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
- .map(|s| format!("{}s ago", s))
+ .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@@ -144,7 +149,7 @@ pub async fn cmd_connect(
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
@@ -160,15 +165,19 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
+ match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
- AdminRpc::BucketInfo(bucket, rk) => {
- print_bucket_info(&bucket, &rk);
+ AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ } => {
+ print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
@@ -176,6 +185,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
+ AdminRpc::WorkerList(wi, wlo) => {
+ print_worker_info(wi, wlo);
+ }
r => {
error!("Unexpected response: {:?}", r);
}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index e76f7737..3884bb92 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,6 +1,6 @@
use garage_util::crdt::Crdt;
-use garage_util::data::*;
use garage_util::error::*;
+use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
@@ -36,21 +36,29 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
let added_nodes = args
.node_ids
.iter()
- .map(|node_id| find_matching_node(status.iter().map(|adv| adv.id), node_id))
+ .map(|node_id| {
+ find_matching_node(
+ status
+ .iter()
+ .map(|adv| adv.id)
+ .chain(layout.node_ids().iter().cloned()),
+ node_id,
+ )
+ })
.collect::<Result<Vec<_>, _>>()?;
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
let mut roles = layout.roles.clone();
roles.merge(&layout.staging);
@@ -203,31 +211,9 @@ pub async fn cmd_apply_layout(
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match apply_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.roles.merge(&layout.staging);
-
- if !layout.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
-
- layout.version += 1;
+ let layout = layout.apply_staged_changes(apply_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -242,25 +228,9 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
-
- match revert_opt.version {
- None => {
- println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout.");
- println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes.");
- return Err(Error::Message("--version flag is missing".into()));
- }
- Some(v) => {
- if v != layout.version + 1 {
- return Err(Error::Message("Invalid value of --version flag".into()));
- }
- }
- }
-
- layout.staging.clear();
- layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]);
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
- layout.version += 1;
+ let layout = layout.revert_staged_changes(revert_opt.version)?;
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -275,7 +245,7 @@ pub async fn fetch_layout(
rpc_host: NodeID,
) -> Result<ClusterLayout, Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@@ -291,7 +261,7 @@ pub async fn send_layout(
rpc_cli
.call(
&rpc_host,
- &SystemRpc::AdvertiseClusterLayout(layout),
+ SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
@@ -323,11 +293,20 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
}
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
- if !layout.staging.items().is_empty() {
+ let has_changes = layout
+ .staging
+ .items()
+ .iter()
+ .any(|(k, _, v)| layout.roles.get(k) != Some(v));
+
+ if has_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for (id, _, role) in layout.staging.items().iter() {
+ if layout.roles.get(id) == Some(role) {
+ continue;
+ }
if let Some(role) = &role.0 {
let tags = role.tags.join(",");
table.push(format!(
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index a0c49aeb..06548e89 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -1,55 +1,65 @@
use serde::{Deserialize, Serialize};
-
use structopt::StructOpt;
+use garage_util::version::garage_version;
+
#[derive(StructOpt, Debug)]
pub enum Command {
/// Run Garage server
- #[structopt(name = "server")]
+ #[structopt(name = "server", version = garage_version())]
Server,
/// Get network status
- #[structopt(name = "status")]
+ #[structopt(name = "status", version = garage_version())]
Status,
/// Operations on individual Garage nodes
- #[structopt(name = "node")]
+ #[structopt(name = "node", version = garage_version())]
Node(NodeOperation),
/// Operations on the assignation of node roles in the cluster layout
- #[structopt(name = "layout")]
+ #[structopt(name = "layout", version = garage_version())]
Layout(LayoutOperation),
/// Operations on buckets
- #[structopt(name = "bucket")]
+ #[structopt(name = "bucket", version = garage_version())]
Bucket(BucketOperation),
/// Operations on S3 access keys
- #[structopt(name = "key")]
+ #[structopt(name = "key", version = garage_version())]
Key(KeyOperation),
/// Run migrations from previous Garage version
/// (DO NOT USE WITHOUT READING FULL DOCUMENTATION)
- #[structopt(name = "migrate")]
+ #[structopt(name = "migrate", version = garage_version())]
Migrate(MigrateOpt),
- /// Start repair of node data
- #[structopt(name = "repair")]
+ /// Start repair of node data on remote node
+ #[structopt(name = "repair", version = garage_version())]
Repair(RepairOpt),
+ /// Offline reparation of node data (these repairs must be run offline
+ /// directly on the server node)
+ #[structopt(name = "offline-repair", version = garage_version())]
+ OfflineRepair(OfflineRepairOpt),
+
/// Gather node statistics
- #[structopt(name = "stats")]
+ #[structopt(name = "stats", version = garage_version())]
Stats(StatsOpt),
+
+ /// Manage background workers
+ #[structopt(name = "worker", version = garage_version())]
+ Worker(WorkerOpt),
}
#[derive(StructOpt, Debug)]
pub enum NodeOperation {
/// Print identifier (public key) of this Garage node
- #[structopt(name = "id")]
+ #[structopt(name = "id", version = garage_version())]
NodeId(NodeIdOpt),
/// Connect to Garage node that is currently isolated from the system
- #[structopt(name = "connect")]
+ #[structopt(name = "connect", version = garage_version())]
Connect(ConnectNodeOpt),
}
@@ -70,23 +80,23 @@ pub struct ConnectNodeOpt {
#[derive(StructOpt, Debug)]
pub enum LayoutOperation {
/// Assign role to Garage node
- #[structopt(name = "assign")]
+ #[structopt(name = "assign", version = garage_version())]
Assign(AssignRoleOpt),
/// Remove role from Garage cluster node
- #[structopt(name = "remove")]
+ #[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt),
/// Show roles currently assigned to nodes and changes staged for commit
- #[structopt(name = "show")]
+ #[structopt(name = "show", version = garage_version())]
Show,
/// Apply staged changes to cluster layout
- #[structopt(name = "apply")]
+ #[structopt(name = "apply", version = garage_version())]
Apply(ApplyLayoutOpt),
/// Revert staged changes to cluster layout
- #[structopt(name = "revert")]
+ #[structopt(name = "revert", version = garage_version())]
Revert(RevertLayoutOpt),
}
@@ -141,40 +151,44 @@ pub struct RevertLayoutOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
- #[structopt(name = "list")]
+ #[structopt(name = "list", version = garage_version())]
List,
/// Get bucket info
- #[structopt(name = "info")]
+ #[structopt(name = "info", version = garage_version())]
Info(BucketOpt),
/// Create bucket
- #[structopt(name = "create")]
+ #[structopt(name = "create", version = garage_version())]
Create(BucketOpt),
/// Delete bucket
- #[structopt(name = "delete")]
+ #[structopt(name = "delete", version = garage_version())]
Delete(DeleteBucketOpt),
/// Alias bucket under new name
- #[structopt(name = "alias")]
+ #[structopt(name = "alias", version = garage_version())]
Alias(AliasBucketOpt),
/// Remove bucket alias
- #[structopt(name = "unalias")]
+ #[structopt(name = "unalias", version = garage_version())]
Unalias(UnaliasBucketOpt),
/// Allow key to read or write to bucket
- #[structopt(name = "allow")]
+ #[structopt(name = "allow", version = garage_version())]
Allow(PermBucketOpt),
/// Deny key from reading or writing to bucket
- #[structopt(name = "deny")]
+ #[structopt(name = "deny", version = garage_version())]
Deny(PermBucketOpt),
/// Expose as website or not
- #[structopt(name = "website")]
+ #[structopt(name = "website", version = garage_version())]
Website(WebsiteOpt),
+
+ /// Set the quotas for this bucket
+ #[structopt(name = "set-quotas", version = garage_version())]
+ SetQuotas(SetQuotasOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
@@ -262,37 +276,52 @@ pub struct PermBucketOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct SetQuotasOpt {
+ /// Bucket name
+ pub bucket: String,
+
+ /// Set a maximum size for the bucket (specify a size e.g. in MiB or GiB,
+ /// or `none` for no size restriction)
+ #[structopt(long = "max-size")]
+ pub max_size: Option<String>,
+
+ /// Set a maximum number of objects for the bucket (or `none` for no restriction)
+ #[structopt(long = "max-objects")]
+ pub max_objects: Option<String>,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
- #[structopt(name = "list")]
+ #[structopt(name = "list", version = garage_version())]
List,
/// Get key info
- #[structopt(name = "info")]
+ #[structopt(name = "info", version = garage_version())]
Info(KeyOpt),
/// Create new key
- #[structopt(name = "new")]
+ #[structopt(name = "new", version = garage_version())]
New(KeyNewOpt),
/// Rename key
- #[structopt(name = "rename")]
+ #[structopt(name = "rename", version = garage_version())]
Rename(KeyRenameOpt),
/// Delete key
- #[structopt(name = "delete")]
+ #[structopt(name = "delete", version = garage_version())]
Delete(KeyDeleteOpt),
/// Set permission flags for key
- #[structopt(name = "allow")]
+ #[structopt(name = "allow", version = garage_version())]
Allow(KeyPermOpt),
/// Unset permission flags for key
- #[structopt(name = "deny")]
+ #[structopt(name = "deny", version = garage_version())]
Deny(KeyPermOpt),
/// Import key
- #[structopt(name = "import")]
+ #[structopt(name = "import", version = garage_version())]
Import(KeyImportOpt),
}
@@ -364,7 +393,7 @@ pub struct MigrateOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum MigrateWhat {
/// Migrate buckets and permissions from v0.5.0
- #[structopt(name = "buckets050")]
+ #[structopt(name = "buckets050", version = garage_version())]
Buckets050,
}
@@ -385,27 +414,69 @@ pub struct RepairOpt {
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Only do a full sync of metadata tables
- #[structopt(name = "tables")]
+ #[structopt(name = "tables", version = garage_version())]
Tables,
/// Only repair (resync/rebalance) the set of stored blocks
- #[structopt(name = "blocks")]
+ #[structopt(name = "blocks", version = garage_version())]
Blocks,
/// Only redo the propagation of object deletions to the version table (slow)
- #[structopt(name = "versions")]
+ #[structopt(name = "versions", version = garage_version())]
Versions,
/// Only redo the propagation of version deletions to the block ref table (extremely slow)
- #[structopt(name = "block_refs")]
+ #[structopt(name = "block_refs", version = garage_version())]
BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
- #[structopt(name = "scrub")]
+ #[structopt(name = "scrub", version = garage_version())]
Scrub {
- /// Tranquility factor (see tranquilizer documentation)
- #[structopt(name = "tranquility", default_value = "2")]
+ #[structopt(subcommand)]
+ cmd: ScrubCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum ScrubCmd {
+ /// Start scrub
+ #[structopt(name = "start", version = garage_version())]
+ Start,
+ /// Pause scrub (it will resume automatically after 24 hours)
+ #[structopt(name = "pause", version = garage_version())]
+ Pause,
+ /// Resume paused scrub
+ #[structopt(name = "resume", version = garage_version())]
+ Resume,
+ /// Cancel scrub in progress
+ #[structopt(name = "cancel", version = garage_version())]
+ Cancel,
+ /// Set tranquility level for in-progress and future scrubs
+ #[structopt(name = "set-tranquility", version = garage_version())]
+ SetTranquility {
+ #[structopt()]
tranquility: u32,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct OfflineRepairOpt {
+ /// Confirm the launch of the repair operation
+ #[structopt(long = "yes")]
+ pub yes: bool,
+
+ #[structopt(subcommand)]
+ pub what: OfflineRepairWhat,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum OfflineRepairWhat {
+ /// Repair K2V item counters
+ #[cfg(feature = "k2v")]
+ #[structopt(name = "k2v_item_counters", version = garage_version())]
+ K2VItemCounters,
+ /// Repair object counters
+ #[structopt(name = "object_counters", version = garage_version())]
+ ObjectCounters,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
@@ -415,3 +486,48 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct WorkerOpt {
+ #[structopt(subcommand)]
+ pub cmd: WorkerCmd,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerCmd {
+ /// List all workers on Garage node
+ #[structopt(name = "list", version = garage_version())]
+ List {
+ #[structopt(flatten)]
+ opt: WorkerListOpt,
+ },
+ /// Set worker parameter
+ #[structopt(name = "set", version = garage_version())]
+ Set {
+ #[structopt(subcommand)]
+ opt: WorkerSetCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub struct WorkerListOpt {
+ /// Show only busy workers
+ #[structopt(short = "b", long = "busy")]
+ pub busy: bool,
+ /// Show only workers with errors
+ #[structopt(short = "e", long = "errors")]
+ pub errors: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerSetCmd {
+ /// Set tranquility of scrub operations
+ #[structopt(name = "scrub-tranquility", version = garage_version())]
+ ScrubTranquility { tranquility: u32 },
+ /// Set number of concurrent block resync workers
+ #[structopt(name = "resync-n-workers", version = garage_version())]
+ ResyncNWorkers { n_workers: usize },
+ /// Set tranquility of block resync operations
+ #[structopt(name = "resync-tranquility", version = garage_version())]
+ ResyncTranquility { tranquility: u32 },
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 7d496507..396938ae 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,11 +1,18 @@
use std::collections::HashMap;
+use std::time::Duration;
+use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
+use garage_util::formater::format_table;
+use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
+use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+
+use crate::cli::structs::WorkerListOpt;
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -28,11 +35,12 @@ pub fn print_bucket_list(bl: Vec<Bucket>) {
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
+
table.push(format!(
"\t{}\t{}\t{}",
aliases.join(","),
local_aliases_n,
- hex::encode(bucket.id)
+ hex::encode(bucket.id),
));
}
format_table(table);
@@ -120,7 +128,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
-pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
+pub fn print_bucket_info(
+ bucket: &Bucket,
+ relevant_keys: &HashMap<String, Key>,
+ counters: &HashMap<String, i64>,
+) {
let key_name = |k| {
relevant_keys
.get(k)
@@ -132,7 +144,42 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
- println!("Website access: {}", p.website_config.get().is_some());
+ let size =
+ bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ println!(
+ "\nSize: {} ({})",
+ size.to_string_as(true),
+ size.to_string_as(false)
+ );
+ println!(
+ "Objects: {}",
+ counters.get(OBJECTS).cloned().unwrap_or_default()
+ );
+ println!(
+ "Unfinished multipart uploads: {}",
+ counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default()
+ );
+
+ println!("\nWebsite access: {}", p.website_config.get().is_some());
+
+ let quotas = p.quotas.get();
+ if quotas.max_size.is_some() || quotas.max_objects.is_some() {
+ println!("\nQuotas:");
+ if let Some(ms) = quotas.max_size {
+ let ms = bytesize::ByteSize::b(ms);
+ println!(
+ " maximum size: {} ({})",
+ ms.to_string_as(true),
+ ms.to_string_as(false)
+ );
+ }
+ if let Some(mo) = quotas.max_objects {
+ println!(" maximum number of objects: {}", mo);
+ }
+ }
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {
@@ -173,42 +220,13 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
};
}
-pub fn format_table(data: Vec<String>) {
- let data = data
- .iter()
- .map(|s| s.split('\t').collect::<Vec<_>>())
- .collect::<Vec<_>>();
-
- let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
- let mut column_size = vec![0; columns];
-
- let mut out = String::new();
-
- for row in data.iter() {
- for (i, col) in row.iter().enumerate() {
- column_size[i] = std::cmp::max(column_size[i], col.chars().count());
- }
- }
-
- for row in data.iter() {
- for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
- out.push_str(col);
- (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
- }
- out.push_str(row[row.len() - 1]);
- out.push('\n');
- }
-
- print!("{}", out);
-}
-
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
- if hex::encode(&c).starts_with(&pattern) {
+ if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
@@ -222,3 +240,56 @@ pub fn find_matching_node(
Ok(candidates[0])
}
}
+
+pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+ let mut wi = wi.into_iter().collect::<Vec<_>>();
+ wi.sort_by_key(|(tid, info)| {
+ (
+ match info.state {
+ WorkerState::Busy | WorkerState::Throttled(_) => 0,
+ WorkerState::Idle => 1,
+ WorkerState::Done => 2,
+ },
+ *tid,
+ )
+ });
+
+ let mut table = vec![];
+ for (tid, info) in wi.iter() {
+ if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
+ continue;
+ }
+ if wlo.errors && info.errors == 0 {
+ continue;
+ }
+
+ table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
+ if let Some(i) = &info.info {
+ table.push(format!("\t\t {}", i));
+ }
+ let tf = timeago::Formatter::new();
+ let (err_ago, err_msg) = info
+ .last_error
+ .as_ref()
+ .map(|(m, t)| {
+ (
+ tf.convert(Duration::from_millis(now_msec() - t)),
+ m.as_str(),
+ )
+ })
+ .unwrap_or(("(?) ago".into(), "(?)"));
+ if info.consecutive_errors > 0 {
+ table.push(format!(
+ "\t\t {} consecutive errors ({} total), last {}",
+ info.consecutive_errors, info.errors, err_ago,
+ ));
+ table.push(format!("\t\t {}", err_msg));
+ } else if info.errors > 0 {
+ table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
+ if wlo.errors {
+ table.push(format!("\t\t {}", err_msg));
+ }
+ }
+ }
+ format_table(table);
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index e898e680..e5cba553 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -8,6 +8,14 @@ mod admin;
mod cli;
mod repair;
mod server;
+#[cfg(feature = "telemetry-otlp")]
+mod tracing_setup;
+
+#[cfg(not(any(feature = "bundled-libs", feature = "system-libs")))]
+compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled");
+
+#[cfg(all(feature = "bundled-libs", feature = "system-libs"))]
+compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled");
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -28,7 +36,10 @@ use admin::*;
use cli::*;
#[derive(StructOpt, Debug)]
-#[structopt(name = "garage")]
+#[structopt(
+ name = "garage",
+ about = "S3-compatible object store for self-hosted geo-distributed deployments"
+)]
struct Opt {
/// Host to connect to for admin operations, in the format:
/// <public-key>@<ip>:<port>
@@ -57,20 +68,56 @@ async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "netapp=info,garage=info")
}
- pretty_env_logger::init();
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
+ .init();
sodiumoxide::init().expect("Unable to init sodiumoxide");
- let opt = Opt::from_args();
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+
+ // Initialize version and features info
+ let features = &[
+ #[cfg(feature = "k2v")]
+ "k2v",
+ #[cfg(feature = "sled")]
+ "sled",
+ #[cfg(feature = "lmdb")]
+ "lmdb",
+ #[cfg(feature = "sqlite")]
+ "sqlite",
+ #[cfg(feature = "kubernetes-discovery")]
+ "kubernetes-discovery",
+ #[cfg(feature = "metrics")]
+ "metrics",
+ #[cfg(feature = "telemetry-otlp")]
+ "telemetry-otlp",
+ #[cfg(feature = "bundled-libs")]
+ "bundled-libs",
+ #[cfg(feature = "system-libs")]
+ "system-libs",
+ ][..];
+ if let Some(git_version) = option_env!("GIT_VERSION") {
+ garage_util::version::init_version(git_version);
+ }
+ garage_util::version::init_features(features);
+
+ // Parse arguments
+ let version = format!(
+ "{} [features: {}]",
+ garage_util::version::garage_version(),
+ features.join(", ")
+ );
+ let opt = Opt::from_clap(&Opt::clap().version(version.as_str()).get_matches());
let res = match opt.cmd {
- Command::Server => {
- // Abort on panic (same behavior as in Go)
- std::panic::set_hook(Box::new(|panic_info| {
- error!("{}", panic_info.to_string());
- std::process::abort();
- }));
-
- server::run_server(opt.config_file).await
+ Command::Server => server::run_server(opt.config_file).await,
+ Command::OfflineRepair(repair_opt) => {
+ repair::offline::offline_repair(opt.config_file, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
@@ -115,7 +162,13 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
- if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr) {
+ if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
+ use std::net::ToSocketAddrs;
+ let a = a
+ .to_socket_addrs()
+ .ok_or_message("unable to resolve rpc_public_addr specified in config file")?
+ .next()
+ .ok_or_message("unable to resolve rpc_public_addr specified in config file")?;
(node_id, a)
} else {
let default_addr = SocketAddr::new(
@@ -141,6 +194,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
+ Err(e) => Err(Error::Message(format!("{}", e))),
Ok(x) => Ok(x),
}
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
deleted file mode 100644
index 3666ca8f..00000000
--- a/src/garage/repair.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use std::sync::Arc;
-
-use tokio::sync::watch;
-
-use garage_model::block_ref_table::*;
-use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
-use garage_table::*;
-use garage_util::error::Error;
-
-use crate::*;
-
-pub struct Repair {
- pub garage: Arc<Garage>,
-}
-
-impl Repair {
- pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
- if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
- warn!("Repair worker failed with error: {}", e);
- }
- }
-
- async fn repair_worker_aux(
- &self,
- opt: RepairOpt,
- must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- self.garage.bucket_table.syncer.add_full_sync();
- self.garage.object_table.syncer.add_full_sync();
- self.garage.version_table.syncer.add_full_sync();
- self.garage.block_ref_table.syncer.add_full_sync();
- self.garage.key_table.syncer.add_full_sync();
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- self.repair_versions(&must_exit).await?;
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- self.repair_block_ref(&must_exit).await?;
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- self.garage
- .block_manager
- .repair_data_store(&must_exit)
- .await?;
- }
- RepairWhat::Scrub { tranquility } => {
- info!("Verifying integrity of stored blocks");
- self.garage
- .block_manager
- .scrub_data_store(&must_exit, tranquility)
- .await?;
- }
- }
- Ok(())
- }
-
- async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
-
- while let Some((item_key, item_bytes)) =
- self.garage.version_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
-
- let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
- if version.deleted.get() {
- continue;
- }
- let object = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?;
- let version_exists = match object {
- Some(o) => o
- .versions()
- .iter()
- .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
- None => false,
- };
- if !version_exists {
- info!("Repair versions: marking version as deleted: {:?}", version);
- self.garage
- .version_table
- .insert(&Version::new(
- version.uuid,
- version.bucket_id,
- version.key,
- true,
- ))
- .await?;
- }
-
- if *must_exit.borrow() {
- break;
- }
- }
- Ok(())
- }
-
- async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
-
- while let Some((item_key, item_bytes)) =
- self.garage.block_ref_table.data.store.get_gt(&pos)?
- {
- pos = item_key.to_vec();
-
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
- if block_ref.deleted.get() {
- continue;
- }
- let version = self
- .garage
- .version_table
- .get(&block_ref.version, &EmptyKey)
- .await?;
- // The version might not exist if it has been GC'ed
- let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
- if !ref_exists {
- info!(
- "Repair block ref: marking block_ref as deleted: {:?}",
- block_ref
- );
- self.garage
- .block_ref_table
- .insert(&BlockRef {
- block: block_ref.block,
- version: block_ref.version,
- deleted: true.into(),
- })
- .await?;
- }
-
- if *must_exit.borrow() {
- break;
- }
- }
- Ok(())
- }
-}
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
new file mode 100644
index 00000000..4699ace5
--- /dev/null
+++ b/src/garage/repair/mod.rs
@@ -0,0 +1,2 @@
+pub mod offline;
+pub mod online;
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
new file mode 100644
index 00000000..7760a8bd
--- /dev/null
+++ b/src/garage/repair/offline.rs
@@ -0,0 +1,55 @@
+use std::path::PathBuf;
+
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::config::*;
+use garage_util::error::*;
+
+use garage_model::garage::Garage;
+
+use crate::cli::structs::*;
+
+pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+ if !opt.yes {
+ return Err(Error::Message(
+ "Please add the --yes flag to launch repair operation".into(),
+ ));
+ }
+
+ info!("Loading configuration...");
+ let config = read_config(config_file)?;
+
+ info!("Initializing background runner...");
+ let (done_tx, done_rx) = watch::channel(false);
+ let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
+
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone(), background)?;
+
+ info!("Launching repair operation...");
+ match opt.what {
+ #[cfg(feature = "k2v")]
+ OfflineRepairWhat::K2VItemCounters => {
+ garage
+ .k2v
+ .counter_table
+ .offline_recount_all(&garage.k2v.item_table)?;
+ }
+ OfflineRepairWhat::ObjectCounters => {
+ garage
+ .object_counter_table
+ .offline_recount_all(&garage.object_table)?;
+ }
+ }
+
+ info!("Repair operation finished, shutting down Garage internals...");
+ done_tx.send(true).unwrap();
+ drop(garage);
+
+ await_background_done.await?;
+
+ info!("Cleaning up...");
+
+ Ok(())
+}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
new file mode 100644
index 00000000..e33cf097
--- /dev/null
+++ b/src/garage/repair/online.rs
@@ -0,0 +1,215 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::sync::watch;
+
+use garage_block::repair::ScrubWorkerCommand;
+use garage_model::garage::Garage;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+use garage_table::*;
+use garage_util::background::*;
+use garage_util::error::Error;
+
+use crate::*;
+
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+ match opt.what {
+ RepairWhat::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync();
+ garage.object_table.syncer.add_full_sync();
+ garage.version_table.syncer.add_full_sync();
+ garage.block_ref_table.syncer.add_full_sync();
+ garage.key_table.syncer.add_full_sync();
+ }
+ RepairWhat::Versions => {
+ info!("Repairing the versions table");
+ garage
+ .background
+ .spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ }
+ RepairWhat::BlockRefs => {
+ info!("Repairing the block refs table");
+ garage
+ .background
+ .spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ }
+ RepairWhat::Blocks => {
+ info!("Repairing the stored blocks");
+ garage
+ .background
+ .spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairWhat::Scrub { cmd } => {
+ let cmd = match cmd {
+ ScrubCmd::Start => ScrubWorkerCommand::Start,
+ ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
+ ScrubCmd::Resume => ScrubWorkerCommand::Resume,
+ ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
+ ScrubCmd::SetTranquility { tranquility } => {
+ ScrubWorkerCommand::SetTranquility(tranquility)
+ }
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await;
+ }
+ }
+}
+
+// ----
+
+struct RepairVersionsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
+
+impl RepairVersionsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairVersionsWorker {
+ fn name(&self) -> String {
+ "Version repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
+ }
+ None => {
+ info!("repair_versions: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ self.counter += 1;
+
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ if !version.deleted.get() {
+ let object = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?;
+ let version_exists = match object {
+ Some(o) => o
+ .versions()
+ .iter()
+ .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted),
+ None => false,
+ };
+ if !version_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ self.garage
+ .version_table
+ .insert(&Version::new(
+ version.uuid,
+ version.bucket_id,
+ version.key,
+ true,
+ ))
+ .await?;
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ unreachable!()
+ }
+}
+
+// ----
+
+struct RepairBlockrefsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
+
+impl RepairBlockrefsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairBlockrefsWorker {
+ fn name(&self) -> String {
+ "Block refs repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
+ }
+ None => {
+ info!("repair_block_ref: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ self.counter += 1;
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ if !block_ref.deleted.get() {
+ let version = self
+ .garage
+ .version_table
+ .get(&block_ref.version, &EmptyKey)
+ .await?;
+ // The version might not exist if it has been GC'ed
+ let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false);
+ if !ref_exists {
+ info!(
+ "Repair block ref: marking block_ref as deleted: {:?}",
+ block_ref
+ );
+ self.garage
+ .block_ref_table
+ .insert(&BlockRef {
+ block: block_ref.block,
+ version: block_ref.version,
+ deleted: true.into(),
+ })
+ .await?;
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ unreachable!()
+ }
+}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 58c9e782..d4099a97 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,13 +6,17 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_admin::metrics::*;
-use garage_admin::tracing_setup::*;
-use garage_api::run_api_server;
+use garage_api::admin::api_server::AdminApiServer;
+use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
-use garage_web::run_web_server;
+use garage_web::WebServer;
+
+#[cfg(feature = "k2v")]
+use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
+#[cfg(feature = "telemetry-otlp")]
+use crate::tracing_setup::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -24,79 +28,124 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file).expect("Unable to read config file");
+ let config = read_config(config_file)?;
- info!("Opening database...");
- let mut db_path = config.metadata_dir.clone();
- db_path.push("db");
- let db = sled::Config::default()
- .path(&db_path)
- .cache_capacity(config.sled_cache_capacity)
- .flush_every_ms(Some(config.sled_flush_every_ms))
- .open()
- .expect("Unable to open sled DB");
+ // ---- Initialize Garage internals ----
- info!("Initialize admin web server and metric backend...");
- let admin_server_init = AdminServer::init();
+ #[cfg(feature = "metrics")]
+ let metrics_exporter = opentelemetry_prometheus::exporter().init();
info!("Initializing background runner...");
- let watch_cancel = netapp::util::watch_ctrl_c();
+ let watch_cancel = watch_shutdown_signal();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background);
+ let garage = Garage::new(config.clone(), background)?;
+
+ if config.admin.trace_sink.is_some() {
+ info!("Initialize tracing...");
- info!("Initialize tracing...");
- if let Some(export_to) = config.admin.trace_sink {
- init_tracing(&export_to, garage.system.id)?;
+ #[cfg(feature = "telemetry-otlp")]
+ init_tracing(config.admin.trace_sink.as_ref().unwrap(), garage.system.id)?;
+
+ #[cfg(not(feature = "telemetry-otlp"))]
+ error!("Garage was built without OTLP exporter, admin.trace_sink is ignored.");
}
+ info!("Initialize Admin API server and metrics collector...");
+ let admin_server = AdminApiServer::new(
+ garage.clone(),
+ #[cfg(feature = "metrics")]
+ metrics_exporter,
+ );
+
+ info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone());
- info!("Initializing API server...");
- let api_server = tokio::spawn(run_api_server(
- garage.clone(),
- wait_from(watch_cancel.clone()),
- ));
+ // ---- Launch public-facing API servers ----
+
+ let mut servers = vec![];
+
+ if let Some(s3_bind_addr) = &config.s3_api.api_bind_addr {
+ info!("Initializing S3 API server...");
+ servers.push((
+ "S3 API",
+ tokio::spawn(S3ApiServer::run(
+ garage.clone(),
+ *s3_bind_addr,
+ config.s3_api.s3_region.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
+ }
- info!("Initializing web server...");
- let web_server = tokio::spawn(run_web_server(
- garage.clone(),
- wait_from(watch_cancel.clone()),
- ));
-
- let admin_server = if let Some(admin_bind_addr) = config.admin.api_bind_addr {
- info!("Configure and run admin web server...");
- Some(tokio::spawn(
- admin_server_init.run(admin_bind_addr, wait_from(watch_cancel.clone())),
- ))
- } else {
- None
- };
+ if config.k2v_api.is_some() {
+ #[cfg(feature = "k2v")]
+ {
+ info!("Initializing K2V API server...");
+ servers.push((
+ "K2V API",
+ tokio::spawn(K2VApiServer::run(
+ garage.clone(),
+ config.k2v_api.as_ref().unwrap().api_bind_addr,
+ config.s3_api.s3_region.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
+ }
+ #[cfg(not(feature = "k2v"))]
+ error!("K2V is not enabled in this build, cannot start K2V API server");
+ }
- // Stuff runs
+ if let Some(web_config) = &config.s3_web {
+ info!("Initializing web server...");
+ servers.push((
+ "Web",
+ tokio::spawn(WebServer::run(
+ garage.clone(),
+ web_config.bind_addr,
+ web_config.root_domain.clone(),
+ wait_from(watch_cancel.clone()),
+ )),
+ ));
+ }
- // When a cancel signal is sent, stuff stops
- if let Err(e) = api_server.await? {
- warn!("API server exited with error: {}", e);
+ if let Some(admin_bind_addr) = &config.admin.api_bind_addr {
+ info!("Launching Admin API server...");
+ servers.push((
+ "Admin",
+ tokio::spawn(admin_server.run(*admin_bind_addr, wait_from(watch_cancel.clone()))),
+ ));
}
- if let Err(e) = web_server.await? {
- warn!("Web server exited with error: {}", e);
+
+ #[cfg(not(feature = "metrics"))]
+ if config.admin.metrics_token.is_some() {
+ warn!("This Garage version is built without the metrics feature");
}
- if let Some(a) = admin_server {
- if let Err(e) = a.await? {
- warn!("Admin web server exited with error: {}", e);
+
+ // Stuff runs
+
+ // When a cancel signal is sent, stuff stops
+
+ // Collect stuff
+ for (desc, join_handle) in servers {
+ if let Err(e) = join_handle.await? {
+ error!("{} server exited with error: {}", desc, e);
+ } else {
+ info!("{} server exited without error.", desc);
}
}
// Remove RPC handlers for system to break reference cycles
garage.system.netapp.drop_all_handlers();
+ opentelemetry::global::shutdown_tracer_provider();
// Await for netapp RPC system to end
run_system.await?;
+ info!("Netapp exited");
// Drop all references so that stuff can terminate properly
drop(garage);
@@ -108,3 +157,44 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
Ok(())
}
+
+#[cfg(unix)]
+fn watch_shutdown_signal() -> watch::Receiver<bool> {
+ use tokio::signal::unix::*;
+
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ tokio::spawn(async move {
+ let mut sigint = signal(SignalKind::interrupt()).expect("Failed to install SIGINT handler");
+ let mut sigterm =
+ signal(SignalKind::terminate()).expect("Failed to install SIGTERM handler");
+ let mut sighup = signal(SignalKind::hangup()).expect("Failed to install SIGHUP handler");
+ tokio::select! {
+ _ = sigint.recv() => info!("Received SIGINT, shutting down."),
+ _ = sigterm.recv() => info!("Received SIGTERM, shutting down."),
+ _ = sighup.recv() => info!("Received SIGHUP, shutting down."),
+ }
+ send_cancel.send(true).unwrap();
+ });
+ watch_cancel
+}
+
+#[cfg(windows)]
+fn watch_shutdown_signal() -> watch::Receiver<bool> {
+ use tokio::signal::windows::*;
+
+ let (send_cancel, watch_cancel) = watch::channel(false);
+ tokio::spawn(async move {
+ let mut sigint = ctrl_c().expect("Failed to install Ctrl-C handler");
+ let mut sigclose = ctrl_close().expect("Failed to install Ctrl-Close handler");
+ let mut siglogoff = ctrl_logoff().expect("Failed to install Ctrl-Logoff handler");
+ let mut sigsdown = ctrl_shutdown().expect("Failed to install Ctrl-Shutdown handler");
+ tokio::select! {
+ _ = sigint.recv() => info!("Received Ctrl-C, shutting down."),
+ _ = sigclose.recv() => info!("Received Ctrl-Close, shutting down."),
+ _ = siglogoff.recv() => info!("Received Ctrl-Logoff, shutting down."),
+ _ = sigsdown.recv() => info!("Received Ctrl-Shutdown, shutting down."),
+ }
+ send_cancel.send(true).unwrap();
+ });
+ watch_cancel
+}
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index ff5cc8da..b32af068 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -29,8 +29,7 @@ async fn test_bucket_all() {
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_some());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
{
// Get its location
@@ -75,13 +74,12 @@ async fn test_bucket_all() {
{
// Check bucket is deleted with List buckets
let r = ctx.client.list_buckets().send().await.unwrap();
- assert!(r
+ assert!(!r
.buckets
.as_ref()
.unwrap()
.iter()
.filter(|x| x.name.as_ref().is_some())
- .find(|x| x.name.as_ref().unwrap() == "hello")
- .is_none());
+ .any(|x| x.name.as_ref().unwrap() == "hello"));
}
}
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index c5ddc6e5..212588b5 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -10,7 +10,7 @@ pub fn build_client(instance: &Instance) -> Client {
None,
"garage-integ-test",
);
- let endpoint = Endpoint::immutable(instance.uri());
+ let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
.region(super::REGION)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 580691a1..1700cc90 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -17,14 +17,25 @@ use garage_api::signature;
pub struct CustomRequester {
key: Key,
uri: Uri,
+ service: &'static str,
client: Client<HttpConnector>,
}
impl CustomRequester {
- pub fn new(instance: &Instance) -> Self {
+ pub fn new_s3(instance: &Instance) -> Self {
CustomRequester {
key: instance.key.clone(),
- uri: instance.uri(),
+ uri: instance.s3_uri(),
+ service: "s3",
+ client: Client::new(),
+ }
+ }
+
+ pub fn new_k2v(instance: &Instance) -> Self {
+ CustomRequester {
+ key: instance.key.clone(),
+ uri: instance.k2v_uri(),
+ service: "k2v",
client: Client::new(),
}
}
@@ -32,6 +43,7 @@ impl CustomRequester {
pub fn builder(&self, bucket: String) -> RequestBuilder<'_> {
RequestBuilder {
requester: self,
+ service: self.service,
bucket,
method: Method::GET,
path: String::new(),
@@ -47,6 +59,7 @@ impl CustomRequester {
pub struct RequestBuilder<'a> {
requester: &'a CustomRequester,
+ service: &'static str,
bucket: String,
method: Method,
path: String,
@@ -59,13 +72,17 @@ pub struct RequestBuilder<'a> {
}
impl<'a> RequestBuilder<'a> {
+ pub fn service(&mut self, service: &'static str) -> &mut Self {
+ self.service = service;
+ self
+ }
pub fn method(&mut self, method: Method) -> &mut Self {
self.method = method;
self
}
- pub fn path(&mut self, path: String) -> &mut Self {
- self.path = path;
+ pub fn path(&mut self, path: impl ToString) -> &mut Self {
+ self.path = path.to_string();
self
}
@@ -74,16 +91,38 @@ impl<'a> RequestBuilder<'a> {
self
}
+ pub fn query_param<T, U>(&mut self, param: T, value: Option<U>) -> &mut Self
+ where
+ T: ToString,
+ U: ToString,
+ {
+ self.query_params
+ .insert(param.to_string(), value.as_ref().map(ToString::to_string));
+ self
+ }
+
pub fn signed_headers(&mut self, signed_headers: HashMap<String, String>) -> &mut Self {
self.signed_headers = signed_headers;
self
}
+ pub fn signed_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
+ self.signed_headers
+ .insert(name.to_string(), value.to_string());
+ self
+ }
+
pub fn unsigned_headers(&mut self, unsigned_headers: HashMap<String, String>) -> &mut Self {
self.unsigned_headers = unsigned_headers;
self
}
+ pub fn unsigned_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
+ self.unsigned_headers
+ .insert(name.to_string(), value.to_string());
+ self
+ }
+
pub fn body(&mut self, body: Vec<u8>) -> &mut Self {
self.body = body;
self
@@ -106,24 +145,24 @@ impl<'a> RequestBuilder<'a> {
let query = query_param_to_string(&self.query_params);
let (host, path) = if self.vhost_style {
(
- format!("{}.s3.garage", self.bucket),
+ format!("{}.{}.garage", self.bucket, self.service),
format!("{}{}", self.path, query),
)
} else {
(
- "s3.garage".to_owned(),
+ format!("{}.garage", self.service),
format!("{}/{}{}", self.bucket, self.path, query),
)
};
let uri = format!("{}{}", self.requester.uri, path);
let now = Utc::now();
- let scope = signature::compute_scope(&now, super::REGION.as_ref());
+ let scope = signature::compute_scope(&now, super::REGION.as_ref(), self.service);
let mut signer = signature::signing_hmac(
&now,
&self.requester.key.secret,
super::REGION.as_ref(),
- "s3",
+ self.service,
)
.unwrap();
let streaming_signer = signer.clone();
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index 88c51501..44d727f9 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -22,7 +22,9 @@ pub struct Instance {
process: process::Child,
pub path: PathBuf,
pub key: Key,
- pub api_port: u16,
+ pub s3_port: u16,
+ pub k2v_port: u16,
+ pub web_port: u16,
}
impl Instance {
@@ -58,9 +60,12 @@ rpc_secret = "{secret}"
[s3_api]
s3_region = "{region}"
-api_bind_addr = "127.0.0.1:{api_port}"
+api_bind_addr = "127.0.0.1:{s3_port}"
root_domain = ".s3.garage"
+[k2v_api]
+api_bind_addr = "127.0.0.1:{k2v_port}"
+
[s3_web]
bind_addr = "127.0.0.1:{web_port}"
root_domain = ".web.garage"
@@ -72,10 +77,11 @@ api_bind_addr = "127.0.0.1:{admin_port}"
path = path.display(),
secret = GARAGE_TEST_SECRET,
region = super::REGION,
- api_port = port,
- rpc_port = port + 1,
- web_port = port + 2,
- admin_port = port + 3,
+ s3_port = port,
+ k2v_port = port + 1,
+ rpc_port = port + 2,
+ web_port = port + 3,
+ admin_port = port + 4,
);
fs::write(path.join("config.toml"), config).expect("Could not write garage config file");
@@ -88,7 +94,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=info,garage_api=debug")
+ .env("RUST_LOG", "garage=info,garage_api=trace")
.spawn()
.expect("Could not start garage");
@@ -96,7 +102,9 @@ api_bind_addr = "127.0.0.1:{admin_port}"
process: child,
path,
key: Key::default(),
- api_port: port,
+ s3_port: port,
+ k2v_port: port + 1,
+ web_port: port + 3,
}
}
@@ -147,8 +155,14 @@ api_bind_addr = "127.0.0.1:{admin_port}"
String::from_utf8(output.stdout).unwrap()
}
- pub fn uri(&self) -> http::Uri {
- format!("http://127.0.0.1:{api_port}", api_port = self.api_port)
+ pub fn s3_uri(&self) -> http::Uri {
+ format!("http://127.0.0.1:{s3_port}", s3_port = self.s3_port)
+ .parse()
+ .expect("Could not build garage endpoint URI")
+ }
+
+ pub fn k2v_uri(&self) -> http::Uri {
+ format!("http://127.0.0.1:{k2v_port}", k2v_port = self.k2v_port)
.parse()
.expect("Could not build garage endpoint URI")
}
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index 8f88c731..28874b02 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -17,18 +17,27 @@ pub struct Context {
pub garage: &'static garage::Instance,
pub client: Client,
pub custom_request: CustomRequester,
+ pub k2v: K2VContext,
+}
+
+pub struct K2VContext {
+ pub request: CustomRequester,
}
impl Context {
fn new() -> Self {
let garage = garage::instance();
let client = client::build_client(garage);
- let custom_request = CustomRequester::new(garage);
+ let custom_request = CustomRequester::new_s3(garage);
+ let k2v_request = CustomRequester::new_k2v(garage);
Context {
garage,
client,
custom_request,
+ k2v: K2VContext {
+ request: k2v_request,
+ },
}
}
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
new file mode 100644
index 00000000..acae1910
--- /dev/null
+++ b/src/garage/tests/k2v/batch.rs
@@ -0,0 +1,612 @@
+use std::collections::HashMap;
+
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_batch() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-batch");
+
+ let mut values = HashMap::new();
+ values.insert("a", "initial test 1");
+ values.insert("b", "initial test 2");
+ values.insert("c", "initial test 3");
+ values.insert("d.1", "initial test 4");
+ values.insert("d.2", "initial test 5");
+ values.insert("e", "initial test 6");
+ let mut ct = HashMap::new();
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "a", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "b", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
+ ]"#,
+ base64::encode(values.get(&"a").unwrap()),
+ base64::encode(values.get(&"b").unwrap()),
+ base64::encode(values.get(&"c").unwrap()),
+ base64::encode(values.get(&"d.1").unwrap()),
+ base64::encode(values.get(&"d.2").unwrap()),
+ base64::encode(values.get(&"e").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["a", "b", "c", "d.1", "d.2", "e"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, values.get(sk).unwrap().as_bytes());
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "start": "c"},
+ {"partitionKey": "root", "start": "c", "end": "dynamite"},
+ {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
+ {"partitionKey": "root", "start": "c", "reverse": true, "end": "azerty"},
+ {"partitionKey": "root", "limit": 1},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": "dynamite",
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": "a",
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": "azerty",
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ ],
+ "more": true,
+ "nextStart": "b",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Insert some new values
+ values.insert("c'", "new test 3");
+ values.insert("d.1'", "new test 4");
+ values.insert("d.2'", "new test 5");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "b", "ct": "{}", "v": null}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": "{}", "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
+ ]"#,
+ ct.get(&"b").unwrap(),
+ base64::encode(values.get(&"c'").unwrap()),
+ ct.get(&"d.1").unwrap(),
+ base64::encode(values.get(&"d.1'").unwrap()),
+ base64::encode(values.get(&"d.2'").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["b", "c", "d.1", "d.2"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ if sk == "b" {
+ assert_eq!(res.status(), 204);
+ } else {
+ assert_eq!(res.status(), 200);
+ }
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "prefix": "d"},
+ {"partitionKey": "root", "prefix": "d.", "end": "d.2"},
+ {"partitionKey": "root", "prefix": "d.", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "limit": 2}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": "d.2",
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": true,
+ "nextStart": "d.2",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 2,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Test DeleteBatch
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("delete", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root", "start": "a", "end": "c"},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "a",
+ "end": "c",
+ "singleItem": false,
+ "deletedItems": 1,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "singleItem": false,
+ "deletedItems": 2,
+ },
+ ])
+ );
+
+ // update our known tombstones
+ for sk in ["a", "b", "d.1", "d.2"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "reverse": true},
+ {"partitionKey": "root", "tombstones": true}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": true,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+}
diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs
new file mode 100644
index 00000000..2fcc45bc
--- /dev/null
+++ b/src/garage/tests/k2v/errorcodes.rs
@@ -0,0 +1,141 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_error_codes() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-error-codes");
+
+ // Regular insert should work (code 200)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Insert with trash causality token: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", "tra$sh")
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search without partition key: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with start that is not in prefix: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root", "prefix": "a", "start": "bx"},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with invalid json: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root"
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": "tra$h", "v": "aGVsbG8sIHdvcmxkCg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid data: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": null, "v": "aGVsbG8sIHdvcmx$Cg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Poll with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some("tra$h"))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+}
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
new file mode 100644
index 00000000..32537336
--- /dev/null
+++ b/src/garage/tests/k2v/item.rs
@@ -0,0 +1,725 @@
+use std::time::Duration;
+
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_items_and_indices() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-and-index");
+
+ // ReadIndex -- there should be nothing
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ let content2_len = "_: hello universe".len();
+ let content3_len = "_: concurrent value".len();
+
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ let content = format!("{}: hello world", sk).into_bytes();
+ let content2 = format!("{}: hello universe", sk).into_bytes();
+ let content3 = format!("{}: concurrent value", sk).into_bytes();
+
+ // Put initially, no causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .body(content.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content);
+
+ // ReadIndex -- now there should be some stuff
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*(content2.len() + content3.len()) + content.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again, this time with causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content2.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content2);
+
+ // ReadIndex -- now there should be some stuff
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*content3.len() + (i+1)*content2.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again with same CT, now we have concurrent values
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content3.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_json = json_body(res).await;
+ assert_json_eq!(
+ res_json,
+ [base64::encode(&content2), base64::encode(&content3)]
+ );
+
+ // ReadIndex -- now there should be some stuff
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i+1,
+ "values": 2*(i+1),
+ "bytes": (i+1)*(content2.len() + content3.len()),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+
+ // Now delete things
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ // Get value back (we just need the CT)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Delete it
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::DELETE)
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // ReadIndex -- now there should be some stuff
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ if i < 3 {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": 3-i,
+ "conflicts": 3-i,
+ "values": 2*(3-i),
+ "bytes": (3-i)*(content2_len + content3_len),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ } else {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+ }
+}
+
+#[tokio::test]
+async fn test_item_return_format() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-return-format");
+
+ let single_value = b"A single value".to_vec();
+ let concurrent_value = b"A concurrent value".to_vec();
+
+ // -- Test with a single value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(single_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // -- Test with a second, concurrent value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(concurrent_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // -- Delete first value, concurrently with second insert --
+ // -- (we now have a concurrent value and a deletion) --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // -- Delete everything --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+}
diff --git a/src/garage/tests/k2v/mod.rs b/src/garage/tests/k2v/mod.rs
new file mode 100644
index 00000000..a009460e
--- /dev/null
+++ b/src/garage/tests/k2v/mod.rs
@@ -0,0 +1,18 @@
+pub mod batch;
+pub mod errorcodes;
+pub mod item;
+pub mod poll;
+pub mod simple;
+
+use hyper::{Body, Response};
+
+pub async fn json_body(res: Response<Body>) -> serde_json::Value {
+ let res_body: serde_json::Value = serde_json::from_slice(
+ &hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec()[..],
+ )
+ .unwrap();
+ res_body
+}
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
new file mode 100644
index 00000000..70dc0410
--- /dev/null
+++ b/src/garage/tests/k2v/poll.rs
@@ -0,0 +1,98 @@
+use hyper::Method;
+use std::time::Duration;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_poll() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Initial value");
+
+ // Start poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ct = ct.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some(ct))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), 200);
+
+ let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(poll_res_body, b"New value");
+}
diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs
new file mode 100644
index 00000000..ae9a8674
--- /dev/null
+++ b/src/garage/tests/k2v/simple.rs
@@ -0,0 +1,40 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-simple");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Hello, world!");
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 8799c395..87be1327 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -3,9 +3,8 @@ mod common;
mod admin;
mod bucket;
-mod list;
-mod multipart;
-mod objects;
-mod simple;
-mod streaming_signature;
-mod website;
+
+mod s3;
+
+#[cfg(feature = "k2v")]
+mod k2v;
diff --git a/src/garage/tests/list.rs b/src/garage/tests/s3/list.rs
index bb03f250..bb03f250 100644
--- a/src/garage/tests/list.rs
+++ b/src/garage/tests/s3/list.rs
diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs
new file mode 100644
index 00000000..623eb665
--- /dev/null
+++ b/src/garage/tests/s3/mod.rs
@@ -0,0 +1,6 @@
+mod list;
+mod multipart;
+mod objects;
+mod simple;
+mod streaming_signature;
+mod website;
diff --git a/src/garage/tests/multipart.rs b/src/garage/tests/s3/multipart.rs
index 895a2993..895a2993 100644
--- a/src/garage/tests/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
diff --git a/src/garage/tests/objects.rs b/src/garage/tests/s3/objects.rs
index e1175b81..65f9e867 100644
--- a/src/garage/tests/objects.rs
+++ b/src/garage/tests/s3/objects.rs
@@ -263,4 +263,13 @@ async fn test_deleteobject() {
.unwrap();
assert!(l.contents.is_none());
+
+ // Deleting a non-existing object shouldn't be a problem
+ ctx.client
+ .delete_object()
+ .bucket(&bucket)
+ .key("l-0")
+ .send()
+ .await
+ .unwrap();
}
diff --git a/src/garage/tests/simple.rs b/src/garage/tests/s3/simple.rs
index f54ae9ac..f54ae9ac 100644
--- a/src/garage/tests/simple.rs
+++ b/src/garage/tests/s3/simple.rs
diff --git a/src/garage/tests/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index c68f7dfc..c68f7dfc 100644
--- a/src/garage/tests/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
diff --git a/src/garage/tests/website.rs b/src/garage/tests/s3/website.rs
index 963d11ea..0570ac6a 100644
--- a/src/garage/tests/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -35,10 +35,7 @@ async fn test_website() {
let req = || {
Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap()
@@ -170,10 +167,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.body(Body::empty())
@@ -198,7 +192,7 @@ async fn test_website_s3_api() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/wrong.html",
- common::garage::DEFAULT_PORT + 2
+ ctx.garage.web_port
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
@@ -217,10 +211,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
@@ -244,10 +235,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "DELETE")
@@ -288,10 +276,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
@@ -319,10 +304,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap();
diff --git a/src/garage/tracing_setup.rs b/src/garage/tracing_setup.rs
new file mode 100644
index 00000000..55fc4094
--- /dev/null
+++ b/src/garage/tracing_setup.rs
@@ -0,0 +1,37 @@
+use std::time::Duration;
+
+use opentelemetry::sdk::{
+ trace::{self, IdGenerator, Sampler},
+ Resource,
+};
+use opentelemetry::KeyValue;
+use opentelemetry_otlp::WithExportConfig;
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+pub fn init_tracing(export_to: &str, node_id: Uuid) -> Result<(), Error> {
+ let node_id = hex::encode(&node_id.as_slice()[..8]);
+
+ opentelemetry_otlp::new_pipeline()
+ .tracing()
+ .with_exporter(
+ opentelemetry_otlp::new_exporter()
+ .tonic()
+ .with_endpoint(export_to)
+ .with_timeout(Duration::from_secs(3)),
+ )
+ .with_trace_config(
+ trace::config()
+ .with_id_generator(IdGenerator::default())
+ .with_sampler(Sampler::AlwaysOn)
+ .with_resource(Resource::new(vec![
+ KeyValue::new("service.name", "garage"),
+ KeyValue::new("service.instance.id", node_id),
+ ])),
+ )
+ .install_batch(opentelemetry::runtime::Tokio)
+ .ok_or_message("Unable to initialize tracing")?;
+
+ Ok(())
+}