aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml27
-rw-r--r--src/garage/admin.rs1298
-rw-r--r--src/garage/admin/block.rs160
-rw-r--r--src/garage/admin/bucket.rs496
-rw-r--r--src/garage/admin/key.rs149
-rw-r--r--src/garage/admin/mod.rs538
-rw-r--r--src/garage/cli/cmd.rs2
-rw-r--r--src/garage/cli/layout.rs2
-rw-r--r--src/garage/cli/structs.rs2
-rw-r--r--src/garage/cli/util.rs9
-rw-r--r--src/garage/main.rs6
-rw-r--r--src/garage/tests/bucket.rs8
-rw-r--r--src/garage/tests/common/client.rs11
-rw-r--r--src/garage/tests/common/custom_requester.rs1
-rw-r--r--src/garage/tests/common/mod.rs19
-rw-r--r--src/garage/tests/k2v_client/mod.rs1
-rw-r--r--src/garage/tests/k2v_client/simple.rs60
-rw-r--r--src/garage/tests/lib.rs2
-rw-r--r--src/garage/tests/s3/multipart.rs4
-rw-r--r--src/garage/tests/s3/objects.rs4
-rw-r--r--src/garage/tests/s3/simple.rs2
-rw-r--r--src/garage/tests/s3/website.rs51
22 files changed, 1497 insertions, 1355 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 0cbdf890..a6187729 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,19 +21,20 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
-garage_api = { version = "0.8.2", path = "../api" }
-garage_block = { version = "0.8.2", path = "../block" }
-garage_model = { version = "0.8.2", path = "../model" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_table = { version = "0.8.2", path = "../table" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_web = { version = "0.8.2", path = "../web" }
+format_table.workspace = true
+garage_db.workspace = true
+garage_api.workspace = true
+garage_block.workspace = true
+garage_model.workspace = true
+garage_rpc.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+garage_web.workspace = true
backtrace = "0.3"
bytes = "1.0"
-bytesize = "1.1"
-timeago = "0.4"
+bytesize = "1.2"
+timeago = { version = "0.4", default-features = false }
parse_duration = "2.1"
hex = "0.4"
tracing = { version = "0.1" }
@@ -41,6 +42,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+git-version = "0.3.4"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
@@ -59,7 +61,8 @@ opentelemetry-otlp = { version = "0.10", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
-aws-sdk-s3 = "0.19"
+aws-config = "0.55.2"
+aws-sdk-s3 = "0.28"
chrono = "0.4"
http = "0.2"
hmac = "0.12"
@@ -71,6 +74,8 @@ assert-json-diff = "2.0"
serde_json = "1.0"
base64 = "0.21"
+k2v-client.workspace = true
+
[features]
default = [ "bundled-libs", "metrics", "sled", "k2v" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
deleted file mode 100644
index 34141cb2..00000000
--- a/src/garage/admin.rs
+++ /dev/null
@@ -1,1298 +0,0 @@
-use std::collections::HashMap;
-use std::fmt::Write;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
-use garage_util::background::BackgroundRunner;
-use garage_util::crdt::*;
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-use garage_util::formater::format_table_to_string;
-use garage_util::time::*;
-
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_rpc::ring::PARTITION_BITS;
-use garage_rpc::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::bucket_alias_table::*;
-use garage_model::bucket_table::*;
-use garage_model::garage::Garage;
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::key_table::*;
-use garage_model::migrate::Migrate;
-use garage_model::permission::*;
-use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::Version;
-
-use crate::cli::*;
-use crate::repair::online::launch_online_repair;
-
-pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
-
-#[derive(Debug, Serialize, Deserialize)]
-#[allow(clippy::large_enum_variant)]
-pub enum AdminRpc {
- BucketOperation(BucketOperation),
- KeyOperation(KeyOperation),
- LaunchRepair(RepairOpt),
- Migrate(MigrateOpt),
- Stats(StatsOpt),
- Worker(WorkerOperation),
- BlockOperation(BlockOperation),
-
- // Replies
- Ok(String),
- BucketList(Vec<Bucket>),
- BucketInfo {
- bucket: Bucket,
- relevant_keys: HashMap<String, Key>,
- counters: HashMap<String, i64>,
- },
- KeyList(Vec<(String, String)>),
- KeyInfo(Key, HashMap<Uuid, Bucket>),
- WorkerList(
- HashMap<usize, garage_util::background::WorkerInfo>,
- WorkerListOpt,
- ),
- WorkerVars(Vec<(Uuid, String, String)>),
- WorkerInfo(usize, garage_util::background::WorkerInfo),
- BlockErrorList(Vec<BlockResyncErrorInfo>),
- BlockInfo {
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- },
-}
-
-impl Rpc for AdminRpc {
- type Response = Result<AdminRpc, Error>;
-}
-
-pub struct AdminRpcHandler {
- garage: Arc<Garage>,
- background: Arc<BackgroundRunner>,
- endpoint: Arc<Endpoint<AdminRpc, Self>>,
-}
-
-impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
- let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self {
- garage,
- background,
- endpoint,
- });
- admin.endpoint.set_handler(admin.clone());
- admin
- }
-
- // ================ BUCKET COMMANDS ====================
-
- async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BucketOperation::List => self.handle_list_buckets().await,
- BucketOperation::Info(query) => self.handle_bucket_info(query).await,
- BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
- BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
- BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
- BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
- BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
- BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
- BucketOperation::Website(query) => self.handle_bucket_website(query).await,
- BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
- BucketOperation::CleanupIncompleteUploads(query) => {
- self.handle_bucket_cleanup_incomplete_uploads(query).await
- }
- }
- }
-
- async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
- let buckets = self
- .garage
- .bucket_table
- .get_range(
- &EmptyKey,
- None,
- Some(DeletedFilter::NotDeleted),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?;
-
- Ok(AdminRpc::BucketList(buckets))
- }
-
- async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let counters = self
- .garage
- .object_counter_table
- .table
- .get(&bucket_id, &EmptyKey)
- .await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
- .unwrap_or_default();
-
- let mut relevant_keys = HashMap::new();
- for (k, _) in bucket
- .state
- .as_option()
- .unwrap()
- .authorized_keys
- .items()
- .iter()
- {
- if let Some(key) = self
- .garage
- .key_table
- .get(&EmptyKey, k)
- .await?
- .filter(|k| !k.is_deleted())
- {
- relevant_keys.insert(k.clone(), key);
- }
- }
- for ((k, _), _, _) in bucket
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .items()
- .iter()
- {
- if relevant_keys.contains_key(k) {
- continue;
- }
- if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
- relevant_keys.insert(k.clone(), key);
- }
- }
-
- Ok(AdminRpc::BucketInfo {
- bucket,
- relevant_keys,
- counters,
- })
- }
-
- #[allow(clippy::ptr_arg)]
- async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
- if !is_valid_bucket_name(name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- name, INVALID_BUCKET_NAME_MESSAGE
- )));
- }
-
- if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
- if alias.state.get().is_some() {
- return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
- }
- }
-
- // ---- done checking, now commit ----
-
- let bucket = Bucket::new();
- self.garage.bucket_table.insert(&bucket).await?;
-
- self.garage
- .bucket_helper()
- .set_global_bucket_alias(bucket.id, name)
- .await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
- }
-
- async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- // Get the alias, but keep in minde here the bucket name
- // given in parameter can also be directly the bucket's ID.
- // In that case bucket_alias will be None, and
- // we can still delete the bucket if it has zero aliases
- // (a condition which we try to prevent but that could still happen somehow).
- // We just won't try to delete an alias entry because there isn't one.
- let bucket_alias = self
- .garage
- .bucket_alias_table
- .get(&EmptyKey, &query.name)
- .await?;
-
- // Check bucket doesn't have other aliases
- let mut bucket = helper.get_existing_bucket(bucket_id).await?;
- let bucket_state = bucket.state.as_option().unwrap();
- if bucket_state
- .aliases
- .items()
- .iter()
- .filter(|(_, _, active)| *active)
- .any(|(name, _, _)| name != &query.name)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
- if bucket_state
- .local_aliases
- .items()
- .iter()
- .any(|(_, _, active)| *active)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
-
- // Check bucket is empty
- if !helper.is_bucket_empty(bucket_id).await? {
- return Err(Error::BadRequest(format!(
- "Bucket {} is not empty",
- query.name
- )));
- }
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- // --- done checking, now commit ---
- // 1. delete authorization from keys that had access
- for (key_id, _) in bucket.authorized_keys() {
- helper
- .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 2. delete bucket alias
- if bucket_alias.is_some() {
- helper
- .purge_global_bucket_alias(bucket_id, &query.name)
- .await?;
- }
-
- // 3. delete bucket
- bucket.state = Deletable::delete();
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
- }
-
- async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.existing_bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- helper
- .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?} in namespace of key {}",
- query.new_name, bucket_id, key.key_id
- )))
- } else {
- helper
- .set_global_bucket_alias(bucket_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?}",
- query.new_name, bucket_id
- )))
- }
- }
-
- async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- let bucket_id = key
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .get(&query.name)
- .cloned()
- .flatten()
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?} in namespace of key {}",
- &query.name, bucket_id, key.key_id
- )))
- } else {
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_global_bucket_alias(bucket_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?}",
- &query.name, bucket_id
- )))
- }
- }
-
- async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = query.read || key.allow_read(&bucket_id);
- let allow_write = query.write || key.allow_write(&bucket_id);
- let allow_owner = query.owner || key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = !query.read && key.allow_read(&bucket_id);
- let allow_write = !query.write && key.allow_write(&bucket_id);
- let allow_owner = !query.owner && key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if !(query.allow ^ query.deny) {
- return Err(Error::BadRequest(
- "You must specify exactly one flag, either --allow or --deny".to_string(),
- ));
- }
-
- let website = if query.allow {
- Some(WebsiteConfig {
- index_document: query.index_document.clone(),
- error_document: query.error_document.clone(),
- })
- } else {
- None
- };
-
- bucket_state.website_config.update(website);
- self.garage.bucket_table.insert(&bucket).await?;
-
- let msg = if query.allow {
- format!("Website access allowed for {}", &query.bucket)
- } else {
- format!("Website access denied for {}", &query.bucket)
- };
-
- Ok(AdminRpc::Ok(msg))
- }
-
- async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if query.max_size.is_none() && query.max_objects.is_none() {
- return Err(Error::BadRequest(
- "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
- ));
- }
-
- let mut quotas = bucket_state.quotas.get().clone();
-
- match query.max_size.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_size = None,
- Some(v) => {
- let bs = v
- .parse::<bytesize::ByteSize>()
- .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
- quotas.max_size = Some(bs.as_u64());
- }
- _ => (),
- }
-
- match query.max_objects.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_objects = None,
- Some(v) => {
- let mo = v
- .parse::<u64>()
- .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
- quotas.max_objects = Some(mo);
- }
- _ => (),
- }
-
- bucket_state.quotas.update(quotas);
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Quotas updated for {}",
- &query.bucket
- )))
- }
-
- async fn handle_bucket_cleanup_incomplete_uploads(
- &self,
- query: &CleanupIncompleteUploadsOpt,
- ) -> Result<AdminRpc, Error> {
- let mut bucket_ids = vec![];
- for b in query.buckets.iter() {
- bucket_ids.push(
- self.garage
- .bucket_helper()
- .resolve_global_bucket_name(b)
- .await?
- .ok_or_bad_request(format!("Bucket not found: {}", b))?,
- );
- }
-
- let duration = parse_duration::parse::parse(&query.older_than)
- .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
-
- let mut ret = String::new();
- for bucket in bucket_ids {
- let count = self
- .garage
- .bucket_helper()
- .cleanup_incomplete_uploads(&bucket, duration)
- .await?;
- writeln!(
- &mut ret,
- "Bucket {:?}: {} incomplete uploads aborted",
- bucket, count
- )
- .unwrap();
- }
-
- Ok(AdminRpc::Ok(ret))
- }
-
- // ================ KEY COMMANDS ====================
-
- async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
- match cmd {
- KeyOperation::List => self.handle_list_keys().await,
- KeyOperation::Info(query) => self.handle_key_info(query).await,
- KeyOperation::New(query) => self.handle_create_key(query).await,
- KeyOperation::Rename(query) => self.handle_rename_key(query).await,
- KeyOperation::Delete(query) => self.handle_delete_key(query).await,
- KeyOperation::Allow(query) => self.handle_allow_key(query).await,
- KeyOperation::Deny(query) => self.handle_deny_key(query).await,
- KeyOperation::Import(query) => self.handle_import_key(query).await,
- }
- }
-
- async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
- let key_ids = self
- .garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?
- .iter()
- .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
- .collect::<Vec<_>>();
- Ok(AdminRpc::KeyList(key_ids))
- }
-
- async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
- let key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- self.key_info_result(key).await
- }
-
- async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
- let key = Key::new(&query.name);
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- key.params_mut()
- .unwrap()
- .name
- .update(query.new_name.clone());
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let key_helper = self.garage.key_helper();
-
- let mut key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- key_helper.delete_key(&mut key).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Key {} was deleted successfully.",
- key.key_id
- )))
- }
-
- async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(true);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(false);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
- let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
- if prev_key.is_some() {
- return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
- }
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
- self.garage.key_table.insert(&imported_key).await?;
-
- self.key_info_result(imported_key).await
- }
-
- async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
- let mut relevant_buckets = HashMap::new();
-
- for (id, _) in key
- .state
- .as_option()
- .unwrap()
- .authorized_buckets
- .items()
- .iter()
- {
- if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
- relevant_buckets.insert(*id, b);
- }
- }
-
- Ok(AdminRpc::KeyInfo(key, relevant_buckets))
- }
-
- // ================ MIGRATION COMMANDS ====================
-
- async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate migration operation.".to_string(),
- ));
- }
-
- let m = Migrate {
- garage: self.garage.clone(),
- };
- match opt.what {
- MigrateWhat::Buckets050 => m.migrate_buckets050().await,
- }?;
- Ok(AdminRpc::Ok("Migration successfull.".into()))
- }
-
- // ================ REPAIR COMMANDS ====================
-
- async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate repair operations.".to_string(),
- ));
- }
- if opt.all_nodes {
- let mut opt_to_send = opt.clone();
- opt_to_send.all_nodes = false;
-
- let mut failures = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- let resp = self
- .endpoint
- .call(
- &node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- PRIO_NORMAL,
- )
- .await;
- if !matches!(resp, Ok(Ok(_))) {
- failures.push(node);
- }
- }
- if failures.is_empty() {
- Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
- } else {
- Err(Error::BadRequest(format!(
- "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
- failures
- )))
- }
- } else {
- launch_online_repair(&self.garage, &self.background, opt).await?;
- Ok(AdminRpc::Ok(format!(
- "Repair launched on {:?}",
- self.garage.system.id
- )))
- }
- }
-
- // ================ STATS COMMANDS ====================
-
- async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
- if opt.all_nodes {
- let mut ret = String::new();
- let ring = self.garage.system.ring.borrow().clone();
-
- for node in ring.layout.node_ids().iter() {
- let mut opt = opt.clone();
- opt.all_nodes = false;
- opt.skip_global = true;
-
- writeln!(&mut ret, "\n======================").unwrap();
- writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
-
- let node_id = (*node).into();
- match self
- .endpoint
- .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
- {
- Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
- Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
- }
- }
-
- writeln!(&mut ret, "\n======================").unwrap();
- write!(
- &mut ret,
- "Cluster statistics:\n\n{}",
- self.gather_cluster_stats()
- )
- .unwrap();
-
- Ok(AdminRpc::Ok(ret))
- } else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
- }
- }
-
- fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
- let mut ret = String::new();
- writeln!(
- &mut ret,
- "\nGarage version: {} [features: {}]\nRust compiler version: {}",
- garage_util::version::garage_version(),
- garage_util::version::garage_features()
- .map(|list| list.join(", "))
- .unwrap_or_else(|| "(unknown)".into()),
- garage_util::version::rust_version(),
- )
- .unwrap();
-
- writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
-
- // Gather table statistics
- let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
- table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
- write!(
- &mut ret,
- "\nTable stats:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- // Gather block manager statistics
- writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- let rc_len = if opt.detailed {
- self.garage.block_manager.rc_len()?.to_string()
- } else {
- self.garage
- .block_manager
- .rc_fast_len()?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into())
- };
-
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- rc_len
- )
- .unwrap();
- writeln!(
- &mut ret,
- " resync queue length: {}",
- self.garage.block_manager.resync.queue_len()?
- )
- .unwrap();
- writeln!(
- &mut ret,
- " blocks with resync errors: {}",
- self.garage.block_manager.resync.errors_len()?
- )
- .unwrap();
-
- if !opt.detailed {
- writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
- }
-
- if !opt.skip_global {
- write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
- }
-
- Ok(ret)
- }
-
- fn gather_cluster_stats(&self) -> String {
- let mut ret = String::new();
-
- // Gather storage node and free space statistics
- let layout = &self.garage.system.ring.borrow().layout;
- let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.ring_assignation_data.iter() {
- let id = layout.node_id_vec[*short_id as usize];
- *node_partition_count.entry(id).or_default() += 1;
- }
- let node_info = self
- .garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|n| (n.id, n))
- .collect::<HashMap<_, _>>();
-
- let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
- for (id, parts) in node_partition_count.iter() {
- let info = node_info.get(id);
- let status = info.map(|x| &x.status);
- let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
- let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
- let capacity = role
- .map(|x| x.capacity_string())
- .unwrap_or_else(|| "?".into());
- let avail_str = |x| match x {
- Some((avail, total)) => {
- let pct = (avail as f64) / (total as f64) * 100.;
- let avail = bytesize::ByteSize::b(avail);
- let total = bytesize::ByteSize::b(total);
- format!("{}/{} ({:.1}%)", avail, total, pct)
- }
- None => "?".into(),
- };
- let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
- let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
- table.push(format!(
- " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
- id, hostname, zone, capacity, parts, data_avail, meta_avail
- ));
- }
- write!(
- &mut ret,
- "Storage nodes:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- let meta_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.meta_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- let data_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.data_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
- let meta_avail =
- bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- let data_avail =
- bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- writeln!(
- &mut ret,
- "\nEstimated available storage space cluster-wide (might be lower in practice):"
- )
- .unwrap();
- if meta_part_avail.len() < node_partition_count.len()
- || data_part_avail.len() < node_partition_count.len()
- {
- writeln!(&mut ret, " data: < {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
- writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
- } else {
- writeln!(&mut ret, " data: {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
- }
- }
-
- ret
- }
-
- fn gather_table_stats<F, R>(
- &self,
- t: &Arc<Table<F, R>>,
- detailed: bool,
- ) -> Result<String, Error>
- where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
- {
- let (data_len, mkl_len) = if detailed {
- (
- t.data.store.len().map_err(GarageError::from)?.to_string(),
- t.merkle_updater.merkle_tree_len()?.to_string(),
- )
- } else {
- (
- t.data
- .store
- .fast_len()
- .map_err(GarageError::from)?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into()),
- t.merkle_updater
- .merkle_tree_fast_len()?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into()),
- )
- };
-
- Ok(format!(
- " {}\t{}\t{}\t{}\t{}",
- F::TABLE_NAME,
- data_len,
- mkl_len,
- t.merkle_updater.todo_len()?,
- t.data.gc_todo_len()?
- ))
- }
-
- // ================ WORKER COMMANDS ====================
-
- async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
- match cmd {
- WorkerOperation::List { opt } => {
- let workers = self.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, *opt))
- }
- WorkerOperation::Info { tid } => {
- let info = self
- .background
- .get_worker_info()
- .get(tid)
- .ok_or_bad_request(format!("No worker with TID {}", tid))?
- .clone();
- Ok(AdminRpc::WorkerInfo(*tid, info))
- }
- WorkerOperation::Get {
- all_nodes,
- variable,
- } => self.handle_get_var(*all_nodes, variable).await,
- WorkerOperation::Set {
- all_nodes,
- variable,
- value,
- } => self.handle_set_var(*all_nodes, variable, value).await,
- }
- }
-
- async fn handle_get_var(
- &self,
- all_nodes: bool,
- variable: &Option<String>,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Get {
- all_nodes: false,
- variable: variable.clone(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- #[allow(clippy::collapsible_else_if)]
- if let Some(v) = variable {
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- v.clone(),
- self.garage.bg_vars.get(v)?,
- )]))
- } else {
- let mut vars = self.garage.bg_vars.get_all();
- vars.sort();
- Ok(AdminRpc::WorkerVars(
- vars.into_iter()
- .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
- .collect(),
- ))
- }
- }
- }
-
- async fn handle_set_var(
- &self,
- all_nodes: bool,
- variable: &str,
- value: &str,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Set {
- all_nodes: false,
- variable: variable.to_string(),
- value: value.to_string(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- self.garage.bg_vars.set(variable, value)?;
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- variable.to_string(),
- value.to_string(),
- )]))
- }
- }
-
- // ================ BLOCK COMMANDS ====================
-
- async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
- self.garage.block_manager.list_resync_errors()?,
- )),
- BlockOperation::Info { hash } => {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let refcount = self.garage.block_manager.get_block_rc(&hash)?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
- let mut versions = vec![];
- for br in block_refs {
- if let Some(v) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- versions.push(Ok(v));
- } else {
- versions.push(Err(br.version));
- }
- }
- Ok(AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- })
- }
- BlockOperation::RetryNow { all, blocks } => {
- if *all {
- if !blocks.is_empty() {
- return Err(Error::BadRequest(
- "--all was specified, cannot also specify blocks".into(),
- ));
- }
- let blocks = self.garage.block_manager.list_resync_errors()?;
- for b in blocks.iter() {
- self.garage.block_manager.resync.clear_backoff(&b.hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- } else {
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- self.garage.block_manager.resync.clear_backoff(&hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- }
- }
- BlockOperation::Purge { yes, blocks } => {
- if !yes {
- return Err(Error::BadRequest(
- "Pass the --yes flag to confirm block purge operation.".into(),
- ));
- }
-
- let mut obj_dels = 0;
- let mut ver_dels = 0;
-
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
-
- for br in block_refs {
- let version = match self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- Some(v) => v,
- None => continue,
- };
-
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
- }
- }
-
- if !version.deleted.get() {
- let deleted_version = Version::new(
- version.uuid,
- version.bucket_id,
- version.key.clone(),
- true,
- );
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
- blocks.len(),
- obj_dels,
- ver_dels
- )))
- }
- }
- }
-}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
- self: &Arc<Self>,
- message: &AdminRpc,
- _from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
- AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
-}
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
new file mode 100644
index 00000000..e9e3ff96
--- /dev/null
+++ b/src/garage/admin/block.rs
@@ -0,0 +1,160 @@
+use garage_util::data::*;
+
+use garage_table::*;
+
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => self.handle_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => {
+ self.handle_block_retry_now(*all, blocks).await
+ }
+ BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
+ }
+ }
+
+ async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+
+ async fn handle_block_retry_now(
+ &self,
+ all: bool,
+ blocks: &[String],
+ ) -> Result<AdminRpc, Error> {
+ if all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+
+ async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version =
+ Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
new file mode 100644
index 00000000..11bb8730
--- /dev/null
+++ b/src/garage/admin/bucket.rs
@@ -0,0 +1,496 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+
+use garage_util::crdt::*;
+use garage_util::time::*;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::permission::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => self.handle_bucket_info(query).await,
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
+ BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.handle_bucket_cleanup_incomplete_uploads(query).await
+ }
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let buckets = self
+ .garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ Ok(AdminRpc::BucketList(buckets))
+ }
+
+ async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = self
+ .garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
+ }
+
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ if !is_valid_bucket_name(name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ if alias.state.get().is_some() {
+ return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
+ }
+ }
+
+ // ---- done checking, now commit ----
+
+ let bucket = Bucket::new();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ self.garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
+
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ // Get the alias, but keep in minde here the bucket name
+ // given in parameter can also be directly the bucket's ID.
+ // In that case bucket_alias will be None, and
+ // we can still delete the bucket if it has zero aliases
+ // (a condition which we try to prevent but that could still happen somehow).
+ // We just won't try to delete an alias entry because there isn't one.
+ let bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(Error::BadRequest(format!(
+ "Bucket {} is not empty",
+ query.name
+ )));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 2. delete bucket alias
+ if bucket_alias.is_some() {
+ helper
+ .purge_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.existing_bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ helper
+ .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?} in namespace of key {}",
+ query.new_name, bucket_id, key.key_id
+ )))
+ } else {
+ helper
+ .set_global_bucket_alias(bucket_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?}",
+ query.new_name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ let bucket_id = key
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .get(&query.name)
+ .cloned()
+ .flatten()
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?} in namespace of key {}",
+ &query.name, bucket_id, key.key_id
+ )))
+ } else {
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?}",
+ &query.name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+ let allow_owner = query.owner || key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+ let allow_owner = !query.owner && key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::BadRequest(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ let website = if query.allow {
+ Some(WebsiteConfig {
+ index_document: query.index_document.clone(),
+ error_document: query.error_document.clone(),
+ })
+ } else {
+ None
+ };
+
+ bucket_state.website_config.update(website);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
+ }
+
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
+ async fn handle_bucket_cleanup_incomplete_uploads(
+ &self,
+ query: &CleanupIncompleteUploadsOpt,
+ ) -> Result<AdminRpc, Error> {
+ let mut bucket_ids = vec![];
+ for b in query.buckets.iter() {
+ bucket_ids.push(
+ self.garage
+ .bucket_helper()
+ .resolve_global_bucket_name(b)
+ .await?
+ .ok_or_bad_request(format!("Bucket not found: {}", b))?,
+ );
+ }
+
+ let duration = parse_duration::parse::parse(&query.older_than)
+ .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
+
+ let mut ret = String::new();
+ for bucket in bucket_ids {
+ let count = self
+ .garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket, duration)
+ .await?;
+ writeln!(
+ &mut ret,
+ "Bucket {:?}: {} incomplete uploads aborted",
+ bucket, count
+ )
+ .unwrap();
+ }
+
+ Ok(AdminRpc::Ok(ret))
+ }
+}
diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs
new file mode 100644
index 00000000..cab13bcf
--- /dev/null
+++ b/src/garage/admin/key.rs
@@ -0,0 +1,149 @@
+use std::collections::HashMap;
+
+use garage_table::*;
+
+use garage_model::helper::error::Error;
+use garage_model::key_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ KeyOperation::List => self.handle_list_keys().await,
+ KeyOperation::Info(query) => self.handle_key_info(query).await,
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Allow(query) => self.handle_allow_key(query).await,
+ KeyOperation::Deny(query) => self.handle_deny_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
+ let key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(&query.name);
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ key.params_mut()
+ .unwrap()
+ .name
+ .update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let key_helper = self.garage.key_helper();
+
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ key_helper.delete_key(&mut key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
+ }
+
+ async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(true);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(false);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+
+ self.key_info_result(imported_key).await
+ }
+
+ async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ for (id, _) in key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ {
+ if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+
+ Ok(AdminRpc::KeyInfo(key, relevant_buckets))
+ }
+}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
new file mode 100644
index 00000000..2709f08a
--- /dev/null
+++ b/src/garage/admin/mod.rs
@@ -0,0 +1,538 @@
+mod block;
+mod bucket;
+mod key;
+
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use format_table::format_table_to_string;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::ring::PARTITION_BITS;
+use garage_rpc::*;
+
+use garage_block::manager::BlockResyncErrorInfo;
+
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::key_table::*;
+use garage_model::migrate::Migrate;
+use garage_model::s3::version_table::Version;
+
+use crate::cli::*;
+use crate::repair::online::launch_online_repair;
+
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
+pub enum AdminRpc {
+ BucketOperation(BucketOperation),
+ KeyOperation(KeyOperation),
+ LaunchRepair(RepairOpt),
+ Migrate(MigrateOpt),
+ Stats(StatsOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
+
+ // Replies
+ Ok(String),
+ BucketList(Vec<Bucket>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
+ KeyList(Vec<(String, String)>),
+ KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
+ WorkerVars(Vec<(Uuid, String, String)>),
+ WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpc, Error>;
+}
+
+pub struct AdminRpcHandler {
+ garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
+}
+
+impl AdminRpcHandler {
+ pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
+ garage,
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
+ }
+
+ // ================ MIGRATION COMMANDS ====================
+
+ async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate migration operation.".to_string(),
+ ));
+ }
+
+ let m = Migrate {
+ garage: self.garage.clone(),
+ };
+ match opt.what {
+ MigrateWhat::Buckets050 => m.migrate_buckets050().await,
+ }?;
+ Ok(AdminRpc::Ok("Migration successfull.".into()))
+ }
+
+ // ================ REPAIR COMMANDS ====================
+
+ async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate repair operations.".to_string(),
+ ));
+ }
+ if opt.all_nodes {
+ let mut opt_to_send = opt.clone();
+ opt_to_send.all_nodes = false;
+
+ let mut failures = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ let resp = self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
+ )
+ .await;
+ if !matches!(resp, Ok(Ok(_))) {
+ failures.push(node);
+ }
+ }
+ if failures.is_empty() {
+ Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
+ } else {
+ Err(Error::BadRequest(format!(
+ "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
+ failures
+ )))
+ }
+ } else {
+ launch_online_repair(&self.garage, &self.background, opt).await?;
+ Ok(AdminRpc::Ok(format!(
+ "Repair launched on {:?}",
+ self.garage.system.id
+ )))
+ }
+ }
+
+ // ================ STATS COMMANDS ====================
+
+ async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
+ if opt.all_nodes {
+ let mut ret = String::new();
+ let ring = self.garage.system.ring.borrow().clone();
+
+ for node in ring.layout.node_ids().iter() {
+ let mut opt = opt.clone();
+ opt.all_nodes = false;
+ opt.skip_global = true;
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+
+ let node_id = (*node).into();
+ match self
+ .endpoint
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
+ .await
+ {
+ Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
+ Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
+ }
+ }
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ write!(
+ &mut ret,
+ "Cluster statistics:\n\n{}",
+ self.gather_cluster_stats()
+ )
+ .unwrap();
+
+ Ok(AdminRpc::Ok(ret))
+ } else {
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
+ }
+ }
+
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "\nGarage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = if opt.detailed {
+ self.garage.block_manager.rc_len()?.to_string()
+ } else {
+ self.garage
+ .block_manager
+ .rc_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into())
+ };
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ self.garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ self.garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ if !opt.detailed {
+ writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
+ }
+
+ if !opt.skip_global {
+ write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
+ }
+
+ Ok(ret)
+ }
+
+ fn gather_cluster_stats(&self) -> String {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics
+ let layout = &self.garage.system.ring.borrow().layout;
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.ring_assignation_data.iter() {
+ let id = layout.node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = self
+ .garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ ret
+ }
+
+ fn gather_table_stats<F, R>(
+ &self,
+ t: &Arc<Table<F, R>>,
+ detailed: bool,
+ ) -> Result<String, Error>
+ where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+ {
+ let (data_len, mkl_len) = if detailed {
+ (
+ t.data.store.len().map_err(GarageError::from)?.to_string(),
+ t.merkle_updater.merkle_tree_len()?.to_string(),
+ )
+ } else {
+ (
+ t.data
+ .store
+ .fast_len()
+ .map_err(GarageError::from)?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ t.merkle_updater
+ .merkle_tree_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ )
+ };
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+ }
+
+ // ================ WORKER COMMANDS ====================
+
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
+ let workers = self.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, *opt))
+ }
+ WorkerOperation::Info { tid } => {
+ let info = self
+ .background
+ .get_worker_info()
+ .get(tid)
+ .ok_or_bad_request(format!("No worker with TID {}", tid))?
+ .clone();
+ Ok(AdminRpc::WorkerInfo(*tid, info))
+ }
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpc, Error> {
+ match message {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 0d735885..cb7a898c 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,8 +1,8 @@
use std::collections::HashSet;
use std::time::Duration;
+use format_table::format_table;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 3884bb92..dc5315a1 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,6 +1,6 @@
+use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 01ae92da..6e585e53 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -216,7 +216,7 @@ pub struct WebsiteOpt {
#[structopt(short = "i", long = "index-document", default_value = "index.html")]
pub index_document: String,
- /// Error document: the optionnal document returned when an error occurs
+ /// Error document: the optional document returned when an error occurs
#[structopt(short = "e", long = "error-document")]
pub error_document: Option<String>,
}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 2c6be2f4..1140cf22 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::time::Duration;
+use format_table::format_table;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
@@ -373,13 +373,18 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
+ let next_try = if e.next_try > now {
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ } else {
+ "asap".to_string()
+ };
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
- tf2.convert(Duration::from_millis(e.next_try - now))
+ next_try
));
}
format_table(table);
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 1ab18bb2..e8aee892 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -108,6 +108,12 @@ async fn main() {
][..];
if let Some(git_version) = option_env!("GIT_VERSION") {
garage_util::version::init_version(git_version);
+ } else {
+ garage_util::version::init_version(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ));
}
garage_util::version::init_features(features);
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index 0dec3cfa..2dda7e6f 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -1,7 +1,6 @@
use crate::common;
use crate::common::ext::CommandExt;
-use aws_sdk_s3::model::BucketLocationConstraint;
-use aws_sdk_s3::output::DeleteBucketOutput;
+use aws_sdk_s3::operation::delete_bucket::DeleteBucketOutput;
#[tokio::test]
async fn test_bucket_all() {
@@ -63,10 +62,7 @@ async fn test_bucket_all() {
.await
.unwrap();
- match r.location_constraint.unwrap() {
- BucketLocationConstraint::Unknown(v) if v.as_str() == "garage-integ-test" => (),
- _ => unreachable!("wrong region"),
- }
+ assert_eq!(r.location_constraint.unwrap().as_str(), "garage-integ-test");
}
{
// (Stub) check GetVersioning
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index e9d4849a..ef4daa5d 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -1,15 +1,16 @@
-use aws_sdk_s3::{Client, Config, Credentials, Endpoint};
+use aws_sdk_s3::config::Credentials;
+use aws_sdk_s3::{Client, Config};
-use super::garage::{Instance, Key};
+use super::garage::Key;
+use crate::common::garage::DEFAULT_PORT;
-pub fn build_client(instance: &Instance, key: &Key) -> Client {
+pub fn build_client(key: &Key) -> Client {
let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test");
- let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
+ .endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
- .endpoint_resolver(endpoint)
.build();
Client::from_conf(config)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 609eda97..4133bb8b 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -209,6 +209,7 @@ impl<'a> RequestBuilder<'a> {
all_headers.extend(self.unsigned_headers.clone());
let canonical_request = signature::payload::canonical_request(
+ self.service,
&self.method,
&Uri::try_from(&uri).unwrap(),
&all_headers,
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index eca3e42b..54efd1ea 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -1,5 +1,7 @@
-use aws_sdk_s3::{Client, Region};
+use aws_sdk_s3::config::Region;
+use aws_sdk_s3::Client;
use ext::*;
+use k2v_client::K2vClient;
#[macro_use]
pub mod macros;
@@ -31,7 +33,7 @@ impl Context {
fn new() -> Self {
let garage = garage::instance();
let key = garage.key(None);
- let client = client::build_client(garage, &key);
+ let client = client::build_client(&key);
let custom_request = CustomRequester::new_s3(garage, &key);
let k2v_request = CustomRequester::new_k2v(garage, &key);
@@ -68,6 +70,19 @@ impl Context {
bucket_name
}
+
+ /// Build a K2vClient for a given bucket
+ pub fn k2v_client(&self, bucket: &str) -> K2vClient {
+ let config = k2v_client::K2vClientConfig {
+ region: REGION.to_string(),
+ endpoint: self.garage.k2v_uri().to_string(),
+ aws_access_key_id: self.key.id.clone(),
+ aws_secret_access_key: self.key.secret.clone(),
+ bucket: bucket.to_string(),
+ user_agent: None,
+ };
+ K2vClient::new(config).expect("Could not create K2V client")
+ }
}
pub fn context() -> Context {
diff --git a/src/garage/tests/k2v_client/mod.rs b/src/garage/tests/k2v_client/mod.rs
new file mode 100644
index 00000000..b252f36b
--- /dev/null
+++ b/src/garage/tests/k2v_client/mod.rs
@@ -0,0 +1 @@
+pub mod simple;
diff --git a/src/garage/tests/k2v_client/simple.rs b/src/garage/tests/k2v_client/simple.rs
new file mode 100644
index 00000000..1a3118ef
--- /dev/null
+++ b/src/garage/tests/k2v_client/simple.rs
@@ -0,0 +1,60 @@
+use std::time::Duration;
+
+use k2v_client::*;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ k2v_client
+ .insert_item("root", "test1", b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item("root", "test1").await.unwrap();
+
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+}
+
+#[tokio::test]
+async fn test_special_chars() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ let (pk, sk) = ("root@plépp", "≤≤««");
+ k2v_client
+ .insert_item(pk, sk, b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item(pk, sk).await.unwrap();
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+
+ // sleep a bit before read_index
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = k2v_client.read_index(Default::default()).await.unwrap();
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), pk);
+
+ let res = k2v_client
+ .read_batch(&[BatchReadOp {
+ partition_key: pk,
+ filter: Default::default(),
+ single_item: false,
+ conflicts_only: false,
+ tombstones: false,
+ }])
+ .await
+ .unwrap();
+ assert_eq!(res.len(), 1);
+ let res = &res[0];
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), sk);
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 87be1327..e450baac 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -8,3 +8,5 @@ mod s3;
#[cfg(feature = "k2v")]
mod k2v;
+#[cfg(feature = "k2v")]
+mod k2v_client;
diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs
index 895a2993..aeff94b4 100644
--- a/src/garage/tests/s3/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
@@ -1,6 +1,6 @@
use crate::common;
-use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
-use aws_sdk_s3::types::ByteStream;
+use aws_sdk_s3::primitives::ByteStream;
+use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;
diff --git a/src/garage/tests/s3/objects.rs b/src/garage/tests/s3/objects.rs
index 65f9e867..27697d45 100644
--- a/src/garage/tests/s3/objects.rs
+++ b/src/garage/tests/s3/objects.rs
@@ -1,6 +1,6 @@
use crate::common;
-use aws_sdk_s3::model::{Delete, ObjectIdentifier};
-use aws_sdk_s3::types::ByteStream;
+use aws_sdk_s3::primitives::ByteStream;
+use aws_sdk_s3::types::{Delete, ObjectIdentifier};
const STD_KEY: &str = "hello world";
const CTRL_KEY: &str = "\x00\x01\x02\x00";
diff --git a/src/garage/tests/s3/simple.rs b/src/garage/tests/s3/simple.rs
index f54ae9ac..41ec44c6 100644
--- a/src/garage/tests/s3/simple.rs
+++ b/src/garage/tests/s3/simple.rs
@@ -2,7 +2,7 @@ use crate::common;
#[tokio::test]
async fn test_simple() {
- use aws_sdk_s3::types::ByteStream;
+ use aws_sdk_s3::primitives::ByteStream;
let ctx = common::context();
let bucket = ctx.create_bucket("test-simple");
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index f61838e4..7c2b0deb 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -4,8 +4,8 @@ use crate::k2v::json_body;
use assert_json_diff::assert_json_eq;
use aws_sdk_s3::{
- model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
- types::ByteStream,
+ primitives::ByteStream,
+ types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
};
use http::{Request, StatusCode};
use hyper::{
@@ -72,7 +72,7 @@ async fn test_website() {
res_body,
json!({
"code": "InvalidRequest",
- "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "message": "Bad request: Domain 'my-website' is not managed by Garage",
"region": "garage-integ-test",
"path": "/check",
})
@@ -91,24 +91,29 @@ async fn test_website() {
BODY.as_ref()
);
- let admin_req = || {
- Request::builder()
- .method("GET")
- .uri(format!(
- "http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
- ))
- .body(Body::empty())
- .unwrap()
- };
-
- let mut admin_resp = client.request(admin_req()).await.unwrap();
- assert_eq!(admin_resp.status(), StatusCode::OK);
- assert_eq!(
- to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
- format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes()
- );
+ for bname in [
+ BCKT_NAME.to_string(),
+ format!("{BCKT_NAME}.web.garage"),
+ format!("{BCKT_NAME}.s3.garage"),
+ ] {
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port, bname
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let mut admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::OK);
+ assert_eq!(
+ to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
+ format!("Domain '{bname}' is managed by Garage").as_bytes()
+ );
+ }
ctx.garage
.command()
@@ -142,7 +147,7 @@ async fn test_website() {
res_body,
json!({
"code": "InvalidRequest",
- "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "message": "Bad request: Domain 'my-website' is not managed by Garage",
"region": "garage-integ-test",
"path": "/check",
})
@@ -397,7 +402,7 @@ async fn test_website_s3_api() {
}
#[tokio::test]
-async fn test_website_check_website_enabled() {
+async fn test_website_check_domain() {
let ctx = common::context();
let client = Client::new();