diff options
author | Alex Auvolat <alex@adnab.me> | 2022-12-13 15:02:42 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-12-13 15:02:42 +0100 |
commit | d7f90cabb0517a50a6c3dd702852770240566bfc (patch) | |
tree | 5c6f99f1dfb6c3187a1dd28a507b88c65a8dc039 | |
parent | 687660b27f904422c689e09d2457293e5313d325 (diff) | |
download | garage-d7f90cabb0517a50a6c3dd702852770240566bfc.tar.gz garage-d7f90cabb0517a50a6c3dd702852770240566bfc.zip |
Implement `block retry-now` and `block purge`
-rw-r--r-- | src/block/resync.rs | 18 | ||||
-rw-r--r-- | src/garage/admin.rs | 108 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 1 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 2 |
4 files changed, 124 insertions, 5 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs index 53b44774..8231b55d 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -123,6 +123,24 @@ impl BlockResyncManager { Ok(self.errors.len()) } + /// Clear the error counter for a block and put it in queue immediately + pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + if let Some(ec) = self.errors.get(hash)? { + let mut ec = ErrorCounter::decode(&ec); + if ec.errors > 0 { + ec.last_try = now - ec.delay_msec(); + self.errors.insert(hash, ec.encode())?; + self.put_to_resync_at(hash, now)?; + return Ok(()); + } + } + Err(Error::Message(format!( + "Block {:?} was not in an errored state", + hash + ))) + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c0b0b3c9..4828bebd 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -25,6 +25,7 @@ use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; use garage_model::permission::*; +use garage_model::s3::object_table::*; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -974,11 +975,110 @@ impl AdminRpcHandler { versions, }) } - BlockOperation::RetryNow { .. } => { - Err(GarageError::Message("not implemented".into()).into()) + BlockOperation::RetryNow { all, blocks } => { + if *all { + if !blocks.is_empty() { + return Err(GarageError::Message( + "--all was specified, cannot also specify blocks".into(), + ) + .into()); + } + let blocks = self.garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + self.garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } else { + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + self.garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(AdminRpc::Ok(format!( + "{} blocks returned in queue for a retry now (check logs to see results)", + blocks.len() + ))) + } } - BlockOperation::Purge { .. } => { - Err(GarageError::Message("not implemented".into()).into()) + BlockOperation::Purge { yes, blocks } => { + if !yes { + return Err(GarageError::Message( + "Pass the --yes flag to confirm block purge operation.".into(), + ) + .into()); + } + + let mut obj_dels = 0; + let mut ver_dels = 0; + + for hash in blocks { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = self + .garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + let version = match self + .garage + .version_table + .get(&br.version, &EmptyKey) + .await? + { + Some(v) => v, + None => continue, + }; + + if let Some(object) = self + .garage + .object_table + .get(&version.bucket_id, &version.key) + .await? + { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == br.version { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + version.bucket_id, + version.key.clone(), + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete( + ObjectVersionData::DeleteMarker, + ), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + obj_dels += 1; + } + } + } + + if !version.deleted.get() { + let deleted_version = Version::new( + version.uuid, + version.bucket_id, + version.key.clone(), + true, + ); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + Ok(AdminRpc::Ok(format!( + "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + blocks.len(), + obj_dels, + ver_dels + ))) } } } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 6d74b1a4..e2f632f3 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -575,6 +575,7 @@ pub enum BlockOperation { #[structopt(long = "yes")] yes: bool, /// Hashes of the block to purge + #[structopt(required = true)] blocks: Vec<String>, }, } diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 737b54b2..63fd9eba 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -382,7 +382,7 @@ pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tPath\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { |