aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-06-06 15:39:15 +0200
committerAlex Auvolat <alex@adnab.me>2023-06-13 16:46:28 +0200
commiteb9cecf05c37d41971ff3e4ba28a37af512c4b2b (patch)
tree1eaf1a24047abc6f918f037dadf398e47a230030 /src
parent802ed757217e55cd2886e137bd462956375fd06d (diff)
downloadgarage-eb9cecf05c37d41971ff3e4ba28a37af512c4b2b.tar.gz
garage-eb9cecf05c37d41971ff3e4ba28a37af512c4b2b.zip
Split garage/admin.rs into smaller files
Diffstat (limited to 'src')
-rw-r--r--src/garage/admin/block.rs160
-rw-r--r--src/garage/admin/bucket.rs496
-rw-r--r--src/garage/admin/key.rs149
-rw-r--r--src/garage/admin/mod.rs770
4 files changed, 810 insertions, 765 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
new file mode 100644
index 00000000..e9e3ff96
--- /dev/null
+++ b/src/garage/admin/block.rs
@@ -0,0 +1,160 @@
+use garage_util::data::*;
+
+use garage_table::*;
+
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => self.handle_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => {
+ self.handle_block_retry_now(*all, blocks).await
+ }
+ BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
+ }
+ }
+
+ async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+
+ async fn handle_block_retry_now(
+ &self,
+ all: bool,
+ blocks: &[String],
+ ) -> Result<AdminRpc, Error> {
+ if all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+
+ async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version =
+ Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
new file mode 100644
index 00000000..11bb8730
--- /dev/null
+++ b/src/garage/admin/bucket.rs
@@ -0,0 +1,496 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+
+use garage_util::crdt::*;
+use garage_util::time::*;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::permission::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => self.handle_bucket_info(query).await,
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
+ BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.handle_bucket_cleanup_incomplete_uploads(query).await
+ }
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let buckets = self
+ .garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ Ok(AdminRpc::BucketList(buckets))
+ }
+
+ async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = self
+ .garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
+ }
+
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ if !is_valid_bucket_name(name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ if alias.state.get().is_some() {
+ return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
+ }
+ }
+
+ // ---- done checking, now commit ----
+
+ let bucket = Bucket::new();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ self.garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
+
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ // Get the alias, but keep in minde here the bucket name
+ // given in parameter can also be directly the bucket's ID.
+ // In that case bucket_alias will be None, and
+ // we can still delete the bucket if it has zero aliases
+ // (a condition which we try to prevent but that could still happen somehow).
+ // We just won't try to delete an alias entry because there isn't one.
+ let bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(Error::BadRequest(format!(
+ "Bucket {} is not empty",
+ query.name
+ )));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 2. delete bucket alias
+ if bucket_alias.is_some() {
+ helper
+ .purge_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.existing_bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ helper
+ .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?} in namespace of key {}",
+ query.new_name, bucket_id, key.key_id
+ )))
+ } else {
+ helper
+ .set_global_bucket_alias(bucket_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?}",
+ query.new_name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ let bucket_id = key
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .get(&query.name)
+ .cloned()
+ .flatten()
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?} in namespace of key {}",
+ &query.name, bucket_id, key.key_id
+ )))
+ } else {
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?}",
+ &query.name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+ let allow_owner = query.owner || key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+ let allow_owner = !query.owner && key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::BadRequest(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ let website = if query.allow {
+ Some(WebsiteConfig {
+ index_document: query.index_document.clone(),
+ error_document: query.error_document.clone(),
+ })
+ } else {
+ None
+ };
+
+ bucket_state.website_config.update(website);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
+ }
+
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
+ async fn handle_bucket_cleanup_incomplete_uploads(
+ &self,
+ query: &CleanupIncompleteUploadsOpt,
+ ) -> Result<AdminRpc, Error> {
+ let mut bucket_ids = vec![];
+ for b in query.buckets.iter() {
+ bucket_ids.push(
+ self.garage
+ .bucket_helper()
+ .resolve_global_bucket_name(b)
+ .await?
+ .ok_or_bad_request(format!("Bucket not found: {}", b))?,
+ );
+ }
+
+ let duration = parse_duration::parse::parse(&query.older_than)
+ .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
+
+ let mut ret = String::new();
+ for bucket in bucket_ids {
+ let count = self
+ .garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket, duration)
+ .await?;
+ writeln!(
+ &mut ret,
+ "Bucket {:?}: {} incomplete uploads aborted",
+ bucket, count
+ )
+ .unwrap();
+ }
+
+ Ok(AdminRpc::Ok(ret))
+ }
+}
diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs
new file mode 100644
index 00000000..cab13bcf
--- /dev/null
+++ b/src/garage/admin/key.rs
@@ -0,0 +1,149 @@
+use std::collections::HashMap;
+
+use garage_table::*;
+
+use garage_model::helper::error::Error;
+use garage_model::key_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ KeyOperation::List => self.handle_list_keys().await,
+ KeyOperation::Info(query) => self.handle_key_info(query).await,
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Allow(query) => self.handle_allow_key(query).await,
+ KeyOperation::Deny(query) => self.handle_deny_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
+ let key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(&query.name);
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ key.params_mut()
+ .unwrap()
+ .name
+ .update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let key_helper = self.garage.key_helper();
+
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ key_helper.delete_key(&mut key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
+ }
+
+ async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(true);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(false);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+
+ self.key_info_result(imported_key).await
+ }
+
+ async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ for (id, _) in key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ {
+ if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+
+ Ok(AdminRpc::KeyInfo(key, relevant_buckets))
+ }
+}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 36fdf1d0..2709f08a 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -1,3 +1,7 @@
+mod block;
+mod bucket;
+mod key;
+
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
@@ -6,11 +10,10 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
+
use garage_util::background::BackgroundRunner;
-use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
-use garage_util::time::*;
use garage_table::replication::*;
use garage_table::*;
@@ -20,14 +23,11 @@ use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
-use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
-use garage_model::permission::*;
-use garage_model::s3::object_table::*;
use garage_model::s3::version_table::Version;
use crate::cli::*;
@@ -92,625 +92,6 @@ impl AdminRpcHandler {
admin
}
- // ================ BUCKET COMMANDS ====================
-
- async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BucketOperation::List => self.handle_list_buckets().await,
- BucketOperation::Info(query) => self.handle_bucket_info(query).await,
- BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
- BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
- BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
- BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
- BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
- BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
- BucketOperation::Website(query) => self.handle_bucket_website(query).await,
- BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
- BucketOperation::CleanupIncompleteUploads(query) => {
- self.handle_bucket_cleanup_incomplete_uploads(query).await
- }
- }
- }
-
- async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
- let buckets = self
- .garage
- .bucket_table
- .get_range(
- &EmptyKey,
- None,
- Some(DeletedFilter::NotDeleted),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?;
-
- Ok(AdminRpc::BucketList(buckets))
- }
-
- async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let counters = self
- .garage
- .object_counter_table
- .table
- .get(&bucket_id, &EmptyKey)
- .await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
- .unwrap_or_default();
-
- let mut relevant_keys = HashMap::new();
- for (k, _) in bucket
- .state
- .as_option()
- .unwrap()
- .authorized_keys
- .items()
- .iter()
- {
- if let Some(key) = self
- .garage
- .key_table
- .get(&EmptyKey, k)
- .await?
- .filter(|k| !k.is_deleted())
- {
- relevant_keys.insert(k.clone(), key);
- }
- }
- for ((k, _), _, _) in bucket
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .items()
- .iter()
- {
- if relevant_keys.contains_key(k) {
- continue;
- }
- if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
- relevant_keys.insert(k.clone(), key);
- }
- }
-
- Ok(AdminRpc::BucketInfo {
- bucket,
- relevant_keys,
- counters,
- })
- }
-
- #[allow(clippy::ptr_arg)]
- async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
- if !is_valid_bucket_name(name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- name, INVALID_BUCKET_NAME_MESSAGE
- )));
- }
-
- if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
- if alias.state.get().is_some() {
- return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
- }
- }
-
- // ---- done checking, now commit ----
-
- let bucket = Bucket::new();
- self.garage.bucket_table.insert(&bucket).await?;
-
- self.garage
- .bucket_helper()
- .set_global_bucket_alias(bucket.id, name)
- .await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
- }
-
- async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- // Get the alias, but keep in minde here the bucket name
- // given in parameter can also be directly the bucket's ID.
- // In that case bucket_alias will be None, and
- // we can still delete the bucket if it has zero aliases
- // (a condition which we try to prevent but that could still happen somehow).
- // We just won't try to delete an alias entry because there isn't one.
- let bucket_alias = self
- .garage
- .bucket_alias_table
- .get(&EmptyKey, &query.name)
- .await?;
-
- // Check bucket doesn't have other aliases
- let mut bucket = helper.get_existing_bucket(bucket_id).await?;
- let bucket_state = bucket.state.as_option().unwrap();
- if bucket_state
- .aliases
- .items()
- .iter()
- .filter(|(_, _, active)| *active)
- .any(|(name, _, _)| name != &query.name)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
- if bucket_state
- .local_aliases
- .items()
- .iter()
- .any(|(_, _, active)| *active)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
-
- // Check bucket is empty
- if !helper.is_bucket_empty(bucket_id).await? {
- return Err(Error::BadRequest(format!(
- "Bucket {} is not empty",
- query.name
- )));
- }
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- // --- done checking, now commit ---
- // 1. delete authorization from keys that had access
- for (key_id, _) in bucket.authorized_keys() {
- helper
- .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 2. delete bucket alias
- if bucket_alias.is_some() {
- helper
- .purge_global_bucket_alias(bucket_id, &query.name)
- .await?;
- }
-
- // 3. delete bucket
- bucket.state = Deletable::delete();
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
- }
-
- async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.existing_bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- helper
- .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?} in namespace of key {}",
- query.new_name, bucket_id, key.key_id
- )))
- } else {
- helper
- .set_global_bucket_alias(bucket_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?}",
- query.new_name, bucket_id
- )))
- }
- }
-
- async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- let bucket_id = key
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .get(&query.name)
- .cloned()
- .flatten()
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?} in namespace of key {}",
- &query.name, bucket_id, key.key_id
- )))
- } else {
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_global_bucket_alias(bucket_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?}",
- &query.name, bucket_id
- )))
- }
- }
-
- async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = query.read || key.allow_read(&bucket_id);
- let allow_write = query.write || key.allow_write(&bucket_id);
- let allow_owner = query.owner || key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = !query.read && key.allow_read(&bucket_id);
- let allow_write = !query.write && key.allow_write(&bucket_id);
- let allow_owner = !query.owner && key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if !(query.allow ^ query.deny) {
- return Err(Error::BadRequest(
- "You must specify exactly one flag, either --allow or --deny".to_string(),
- ));
- }
-
- let website = if query.allow {
- Some(WebsiteConfig {
- index_document: query.index_document.clone(),
- error_document: query.error_document.clone(),
- })
- } else {
- None
- };
-
- bucket_state.website_config.update(website);
- self.garage.bucket_table.insert(&bucket).await?;
-
- let msg = if query.allow {
- format!("Website access allowed for {}", &query.bucket)
- } else {
- format!("Website access denied for {}", &query.bucket)
- };
-
- Ok(AdminRpc::Ok(msg))
- }
-
- async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if query.max_size.is_none() && query.max_objects.is_none() {
- return Err(Error::BadRequest(
- "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
- ));
- }
-
- let mut quotas = bucket_state.quotas.get().clone();
-
- match query.max_size.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_size = None,
- Some(v) => {
- let bs = v
- .parse::<bytesize::ByteSize>()
- .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
- quotas.max_size = Some(bs.as_u64());
- }
- _ => (),
- }
-
- match query.max_objects.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_objects = None,
- Some(v) => {
- let mo = v
- .parse::<u64>()
- .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
- quotas.max_objects = Some(mo);
- }
- _ => (),
- }
-
- bucket_state.quotas.update(quotas);
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Quotas updated for {}",
- &query.bucket
- )))
- }
-
- async fn handle_bucket_cleanup_incomplete_uploads(
- &self,
- query: &CleanupIncompleteUploadsOpt,
- ) -> Result<AdminRpc, Error> {
- let mut bucket_ids = vec![];
- for b in query.buckets.iter() {
- bucket_ids.push(
- self.garage
- .bucket_helper()
- .resolve_global_bucket_name(b)
- .await?
- .ok_or_bad_request(format!("Bucket not found: {}", b))?,
- );
- }
-
- let duration = parse_duration::parse::parse(&query.older_than)
- .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
-
- let mut ret = String::new();
- for bucket in bucket_ids {
- let count = self
- .garage
- .bucket_helper()
- .cleanup_incomplete_uploads(&bucket, duration)
- .await?;
- writeln!(
- &mut ret,
- "Bucket {:?}: {} incomplete uploads aborted",
- bucket, count
- )
- .unwrap();
- }
-
- Ok(AdminRpc::Ok(ret))
- }
-
- // ================ KEY COMMANDS ====================
-
- async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
- match cmd {
- KeyOperation::List => self.handle_list_keys().await,
- KeyOperation::Info(query) => self.handle_key_info(query).await,
- KeyOperation::New(query) => self.handle_create_key(query).await,
- KeyOperation::Rename(query) => self.handle_rename_key(query).await,
- KeyOperation::Delete(query) => self.handle_delete_key(query).await,
- KeyOperation::Allow(query) => self.handle_allow_key(query).await,
- KeyOperation::Deny(query) => self.handle_deny_key(query).await,
- KeyOperation::Import(query) => self.handle_import_key(query).await,
- }
- }
-
- async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
- let key_ids = self
- .garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?
- .iter()
- .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
- .collect::<Vec<_>>();
- Ok(AdminRpc::KeyList(key_ids))
- }
-
- async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
- let key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- self.key_info_result(key).await
- }
-
- async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
- let key = Key::new(&query.name);
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- key.params_mut()
- .unwrap()
- .name
- .update(query.new_name.clone());
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let key_helper = self.garage.key_helper();
-
- let mut key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- key_helper.delete_key(&mut key).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Key {} was deleted successfully.",
- key.key_id
- )))
- }
-
- async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(true);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(false);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
- let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
- if prev_key.is_some() {
- return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
- }
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
- self.garage.key_table.insert(&imported_key).await?;
-
- self.key_info_result(imported_key).await
- }
-
- async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
- let mut relevant_buckets = HashMap::new();
-
- for (id, _) in key
- .state
- .as_option()
- .unwrap()
- .authorized_buckets
- .items()
- .iter()
- {
- if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
- relevant_buckets.insert(*id, b);
- }
- }
-
- Ok(AdminRpc::KeyInfo(key, relevant_buckets))
- }
-
// ================ MIGRATION COMMANDS ====================
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
@@ -1134,147 +515,6 @@ impl AdminRpcHandler {
)]))
}
}
-
- // ================ BLOCK COMMANDS ====================
-
- async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
- self.garage.block_manager.list_resync_errors()?,
- )),
- BlockOperation::Info { hash } => {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let refcount = self.garage.block_manager.get_block_rc(&hash)?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
- let mut versions = vec![];
- for br in block_refs {
- if let Some(v) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- versions.push(Ok(v));
- } else {
- versions.push(Err(br.version));
- }
- }
- Ok(AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- })
- }
- BlockOperation::RetryNow { all, blocks } => {
- if *all {
- if !blocks.is_empty() {
- return Err(Error::BadRequest(
- "--all was specified, cannot also specify blocks".into(),
- ));
- }
- let blocks = self.garage.block_manager.list_resync_errors()?;
- for b in blocks.iter() {
- self.garage.block_manager.resync.clear_backoff(&b.hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- } else {
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- self.garage.block_manager.resync.clear_backoff(&hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- }
- }
- BlockOperation::Purge { yes, blocks } => {
- if !yes {
- return Err(Error::BadRequest(
- "Pass the --yes flag to confirm block purge operation.".into(),
- ));
- }
-
- let mut obj_dels = 0;
- let mut ver_dels = 0;
-
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
-
- for br in block_refs {
- let version = match self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- Some(v) => v,
- None => continue,
- };
-
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
- }
- }
-
- if !version.deleted.get() {
- let deleted_version = Version::new(
- version.uuid,
- version.bucket_id,
- version.key.clone(),
- true,
- );
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
- blocks.len(),
- obj_dels,
- ver_dels
- )))
- }
- }
- }
}
#[async_trait]