aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/admin/block.rs')
-rw-r--r--src/garage/admin/block.rs160
1 files changed, 160 insertions, 0 deletions
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
new file mode 100644
index 00000000..e9e3ff96
--- /dev/null
+++ b/src/garage/admin/block.rs
@@ -0,0 +1,160 @@
+use garage_util::data::*;
+
+use garage_table::*;
+
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => self.handle_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => {
+ self.handle_block_retry_now(*all, blocks).await
+ }
+ BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
+ }
+ }
+
+ async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+
+ async fn handle_block_retry_now(
+ &self,
+ all: bool,
+ blocks: &[String],
+ ) -> Result<AdminRpc, Error> {
+ if all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+
+ async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version =
+ Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+}