aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-06-13 16:42:14 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-13 16:42:14 +0200
commit802ed757217e55cd2886e137bd462956375fd06d (patch)
tree4809bf835798722e4d86bb1b745c8542c1549354 /src/garage/admin
parentfc2954893384d68b6ef0711f08ee1e64feb103ee (diff)
downloadgarage-802ed757217e55cd2886e137bd462956375fd06d.tar.gz
garage-802ed757217e55cd2886e137bd462956375fd06d.zip
move admin.rs to admin/mod.rs, before splitting
Diffstat (limited to 'src/garage/admin')
-rw-r--r--src/garage/admin/mod.rs1298
1 files changed, 1298 insertions, 0 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
new file mode 100644
index 00000000..36fdf1d0
--- /dev/null
+++ b/src/garage/admin/mod.rs
@@ -0,0 +1,1298 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use format_table::format_table_to_string;
+use garage_util::background::BackgroundRunner;
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::*;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::ring::PARTITION_BITS;
+use garage_rpc::*;
+
+use garage_block::manager::BlockResyncErrorInfo;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::key_table::*;
+use garage_model::migrate::Migrate;
+use garage_model::permission::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
+
+use crate::cli::*;
+use crate::repair::online::launch_online_repair;
+
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
+pub enum AdminRpc {
+ BucketOperation(BucketOperation),
+ KeyOperation(KeyOperation),
+ LaunchRepair(RepairOpt),
+ Migrate(MigrateOpt),
+ Stats(StatsOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
+
+ // Replies
+ Ok(String),
+ BucketList(Vec<Bucket>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
+ KeyList(Vec<(String, String)>),
+ KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
+ WorkerVars(Vec<(Uuid, String, String)>),
+ WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpc, Error>;
+}
+
+pub struct AdminRpcHandler {
+ garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
+}
+
+impl AdminRpcHandler {
+ pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
+ garage,
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
+ }
+
+ // ================ BUCKET COMMANDS ====================
+
+ async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => self.handle_bucket_info(query).await,
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
+ BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.handle_bucket_cleanup_incomplete_uploads(query).await
+ }
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let buckets = self
+ .garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ Ok(AdminRpc::BucketList(buckets))
+ }
+
+ async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = self
+ .garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
+ }
+
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ if !is_valid_bucket_name(name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ if alias.state.get().is_some() {
+ return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
+ }
+ }
+
+ // ---- done checking, now commit ----
+
+ let bucket = Bucket::new();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ self.garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
+
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ // Get the alias, but keep in minde here the bucket name
+ // given in parameter can also be directly the bucket's ID.
+ // In that case bucket_alias will be None, and
+ // we can still delete the bucket if it has zero aliases
+ // (a condition which we try to prevent but that could still happen somehow).
+ // We just won't try to delete an alias entry because there isn't one.
+ let bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(Error::BadRequest(format!(
+ "Bucket {} is not empty",
+ query.name
+ )));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 2. delete bucket alias
+ if bucket_alias.is_some() {
+ helper
+ .purge_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.existing_bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ helper
+ .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?} in namespace of key {}",
+ query.new_name, bucket_id, key.key_id
+ )))
+ } else {
+ helper
+ .set_global_bucket_alias(bucket_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?}",
+ query.new_name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ let bucket_id = key
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .get(&query.name)
+ .cloned()
+ .flatten()
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?} in namespace of key {}",
+ &query.name, bucket_id, key.key_id
+ )))
+ } else {
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?}",
+ &query.name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+ let allow_owner = query.owner || key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+ let allow_owner = !query.owner && key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::BadRequest(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ let website = if query.allow {
+ Some(WebsiteConfig {
+ index_document: query.index_document.clone(),
+ error_document: query.error_document.clone(),
+ })
+ } else {
+ None
+ };
+
+ bucket_state.website_config.update(website);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
+ }
+
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
+ async fn handle_bucket_cleanup_incomplete_uploads(
+ &self,
+ query: &CleanupIncompleteUploadsOpt,
+ ) -> Result<AdminRpc, Error> {
+ let mut bucket_ids = vec![];
+ for b in query.buckets.iter() {
+ bucket_ids.push(
+ self.garage
+ .bucket_helper()
+ .resolve_global_bucket_name(b)
+ .await?
+ .ok_or_bad_request(format!("Bucket not found: {}", b))?,
+ );
+ }
+
+ let duration = parse_duration::parse::parse(&query.older_than)
+ .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
+
+ let mut ret = String::new();
+ for bucket in bucket_ids {
+ let count = self
+ .garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket, duration)
+ .await?;
+ writeln!(
+ &mut ret,
+ "Bucket {:?}: {} incomplete uploads aborted",
+ bucket, count
+ )
+ .unwrap();
+ }
+
+ Ok(AdminRpc::Ok(ret))
+ }
+
+ // ================ KEY COMMANDS ====================
+
+ async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ KeyOperation::List => self.handle_list_keys().await,
+ KeyOperation::Info(query) => self.handle_key_info(query).await,
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Allow(query) => self.handle_allow_key(query).await,
+ KeyOperation::Deny(query) => self.handle_deny_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
+ let key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(&query.name);
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ key.params_mut()
+ .unwrap()
+ .name
+ .update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let key_helper = self.garage.key_helper();
+
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ key_helper.delete_key(&mut key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
+ }
+
+ async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(true);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(false);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+
+ self.key_info_result(imported_key).await
+ }
+
+ async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ for (id, _) in key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ {
+ if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+
+ Ok(AdminRpc::KeyInfo(key, relevant_buckets))
+ }
+
+ // ================ MIGRATION COMMANDS ====================
+
+ async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate migration operation.".to_string(),
+ ));
+ }
+
+ let m = Migrate {
+ garage: self.garage.clone(),
+ };
+ match opt.what {
+ MigrateWhat::Buckets050 => m.migrate_buckets050().await,
+ }?;
+ Ok(AdminRpc::Ok("Migration successfull.".into()))
+ }
+
+ // ================ REPAIR COMMANDS ====================
+
+ async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate repair operations.".to_string(),
+ ));
+ }
+ if opt.all_nodes {
+ let mut opt_to_send = opt.clone();
+ opt_to_send.all_nodes = false;
+
+ let mut failures = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ let resp = self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
+ )
+ .await;
+ if !matches!(resp, Ok(Ok(_))) {
+ failures.push(node);
+ }
+ }
+ if failures.is_empty() {
+ Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
+ } else {
+ Err(Error::BadRequest(format!(
+ "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
+ failures
+ )))
+ }
+ } else {
+ launch_online_repair(&self.garage, &self.background, opt).await?;
+ Ok(AdminRpc::Ok(format!(
+ "Repair launched on {:?}",
+ self.garage.system.id
+ )))
+ }
+ }
+
+ // ================ STATS COMMANDS ====================
+
+ async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
+ if opt.all_nodes {
+ let mut ret = String::new();
+ let ring = self.garage.system.ring.borrow().clone();
+
+ for node in ring.layout.node_ids().iter() {
+ let mut opt = opt.clone();
+ opt.all_nodes = false;
+ opt.skip_global = true;
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+
+ let node_id = (*node).into();
+ match self
+ .endpoint
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
+ .await
+ {
+ Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
+ Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
+ }
+ }
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ write!(
+ &mut ret,
+ "Cluster statistics:\n\n{}",
+ self.gather_cluster_stats()
+ )
+ .unwrap();
+
+ Ok(AdminRpc::Ok(ret))
+ } else {
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
+ }
+ }
+
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "\nGarage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = if opt.detailed {
+ self.garage.block_manager.rc_len()?.to_string()
+ } else {
+ self.garage
+ .block_manager
+ .rc_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into())
+ };
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ self.garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ self.garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ if !opt.detailed {
+ writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
+ }
+
+ if !opt.skip_global {
+ write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
+ }
+
+ Ok(ret)
+ }
+
+ fn gather_cluster_stats(&self) -> String {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics
+ let layout = &self.garage.system.ring.borrow().layout;
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.ring_assignation_data.iter() {
+ let id = layout.node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = self
+ .garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ ret
+ }
+
+ fn gather_table_stats<F, R>(
+ &self,
+ t: &Arc<Table<F, R>>,
+ detailed: bool,
+ ) -> Result<String, Error>
+ where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+ {
+ let (data_len, mkl_len) = if detailed {
+ (
+ t.data.store.len().map_err(GarageError::from)?.to_string(),
+ t.merkle_updater.merkle_tree_len()?.to_string(),
+ )
+ } else {
+ (
+ t.data
+ .store
+ .fast_len()
+ .map_err(GarageError::from)?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ t.merkle_updater
+ .merkle_tree_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ )
+ };
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+ }
+
+ // ================ WORKER COMMANDS ====================
+
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
+ let workers = self.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, *opt))
+ }
+ WorkerOperation::Info { tid } => {
+ let info = self
+ .background
+ .get_worker_info()
+ .get(tid)
+ .ok_or_bad_request(format!("No worker with TID {}", tid))?
+ .clone();
+ Ok(AdminRpc::WorkerInfo(*tid, info))
+ }
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
+ }
+ }
+
+ // ================ BLOCK COMMANDS ====================
+
+ async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+ BlockOperation::RetryNow { all, blocks } => {
+ if *all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+ BlockOperation::Purge { yes, blocks } => {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(
+ version.uuid,
+ version.bucket_id,
+ version.key.clone(),
+ true,
+ );
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpc, Error> {
+ match message {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+}