diff options
author | Alex <alex@adnab.me> | 2023-01-02 12:42:24 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-02 12:42:24 +0000 |
commit | 7f7d53cfa991054afcd2940cc43a4d7f1a6668e7 (patch) | |
tree | 15c60c9ecf583532eaf62d492beb63bd8ae7e3b2 /src/block | |
parent | 1af4a5ed569e42f77dd4ecc9364a27f7ed43df63 (diff) | |
parent | d1279e04f3550eae2eb5e0f25efbdf69b42fbeb9 (diff) | |
download | garage-7f7d53cfa991054afcd2940cc43a4d7f1a6668e7.tar.gz garage-7f7d53cfa991054afcd2940cc43a4d7f1a6668e7.zip |
Merge pull request 'improvements to CLI and new debug features' (#448) from cli-improvements into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/448
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/manager.rs | 42 | ||||
-rw-r--r-- | src/block/metrics.rs | 12 | ||||
-rw-r--r-- | src/block/rc.rs | 7 | ||||
-rw-r--r-- | src/block/repair.rs | 58 | ||||
-rw-r--r-- | src/block/resync.rs | 57 |
5 files changed, 129 insertions, 47 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 7f439b96..28523a93 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -90,6 +90,15 @@ pub struct BlockManager { tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct BlockResyncErrorInfo { + pub hash: Hash, + pub refcount: u64, + pub error_count: u64, + pub last_try: u64, + pub next_try: u64, +} + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -114,7 +123,8 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); + let metrics = + BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); let (scrub_tx, scrub_rx) = mpsc::channel(1); @@ -309,11 +319,41 @@ impl BlockManager { Ok(self.rc.rc.len()?) } + /// Get number of items in the refcount table + pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> { + Ok(self.rc.rc.fast_len()?) + } + /// Send command to start/stop/manager scrub worker pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { let _ = self.tx_scrub_command.send(cmd).await; } + /// Get the reference count of a block + pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> { + Ok(self.rc.get_block_rc(hash)?.as_u64()) + } + + /// List all resync errors + pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> { + let mut blocks = Vec::with_capacity(self.resync.errors.len()); + for ent in self.resync.errors.iter()? { + let (hash, cnt) = ent?; + let cnt = ErrorCounter::decode(&cnt); + blocks.push(BlockResyncErrorInfo { + hash: Hash::try_from(&hash).unwrap(), + refcount: 0, + error_count: cnt.errors, + last_try: cnt.last_try, + next_try: cnt.next_try(), + }); + } + for block in blocks.iter_mut() { + block.refcount = self.get_block_rc(&block.hash)?; + } + Ok(blocks) + } + //// ----- Managing the reference counter ---- /// Increment the number of time a block is used, putting it to resynchronization if it is diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 477add66..fbef95af 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,9 +1,11 @@ use opentelemetry::{global, metrics::*}; +use garage_db as db; use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { + pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>, @@ -23,9 +25,17 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self { + pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { + _rc_size: meter + .u64_value_observer("block.rc_size", move |observer| { + if let Ok(Some(v)) = rc_tree.fast_len() { + observer.observe(v as u64, &[]) + } + }) + .with_description("Number of blocks known to the reference counter") + .init(), _resync_queue_len: meter .u64_value_observer("block.resync_queue_length", move |observer| { observer.observe(resync_queue.len() as u64, &[]) diff --git a/src/block/rc.rs b/src/block/rc.rs index ce6defad..8dae3960 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -169,4 +169,11 @@ impl RcEntry { pub(crate) fn is_needed(&self) -> bool { !self.is_deletable() } + + pub(crate) fn as_u64(&self) -> u64 { + match self { + RcEntry::Present { count } => *count, + _ => 0, + } + } } diff --git a/src/block/repair.rs b/src/block/repair.rs index e2884b69..1878027e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -53,7 +53,7 @@ impl Worker for RepairWorker { "Block repair worker".into() } - fn info(&self) -> Option<String> { + fn status(&self) -> WorkerStatus { match self.block_iter.as_ref() { None => { let idx_bytes = self @@ -66,9 +66,20 @@ impl Worker for RepairWorker { } else { idx_bytes }; - Some(format!("Phase 1: {}", hex::encode(idx_bytes))) + WorkerStatus { + progress: Some("0.00%".into()), + freeform: vec![format!( + "Currently in phase 1, iterator position: {}", + hex::encode(idx_bytes) + )], + ..Default::default() + } } - Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)), + Some(bi) => WorkerStatus { + progress: Some(format!("{:.2}%", bi.progress() * 100.)), + freeform: vec!["Currently in phase 2".into()], + ..Default::default() + }, } } @@ -271,29 +282,28 @@ impl Worker for ScrubWorker { "Block scrub worker".into() } - fn info(&self) -> Option<String> { - let s = match &self.work { - ScrubWorkerState::Running(bsi) => format!( - "{:.2}% done (tranquility = {})", - bsi.progress() * 100., - self.persisted.tranquility - ), + fn status(&self) -> WorkerStatus { + let mut s = WorkerStatus { + persistent_errors: Some(self.persisted.corruptions_detected), + tranquility: Some(self.persisted.tranquility), + ..Default::default() + }; + match &self.work { + ScrubWorkerState::Running(bsi) => { + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + } ScrubWorkerState::Paused(bsi, rt) => { - format!( - "Paused, {:.2}% done, resumes at {}", - bsi.progress() * 100., - msec_to_rfc3339(*rt) - ) + s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } - ScrubWorkerState::Finished => format!( - "Last completed scrub: {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - ), - }; - Some(format!( - "{} ; corruptions detected: {}", - s, self.persisted.corruptions_detected - )) + ScrubWorkerState::Finished => { + s.freeform = vec![format!( + "Last scrub completed at {}", + msec_to_rfc3339(self.persisted.time_last_complete_scrub) + )]; + } + } + s } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { diff --git a/src/block/resync.rs b/src/block/resync.rs index ada3ac54..8231b55d 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -123,6 +123,24 @@ impl BlockResyncManager { Ok(self.errors.len()) } + /// Clear the error counter for a block and put it in queue immediately + pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + if let Some(ec) = self.errors.get(hash)? { + let mut ec = ErrorCounter::decode(&ec); + if ec.errors > 0 { + ec.last_try = now - ec.delay_msec(); + self.errors.insert(hash, ec.encode())?; + self.put_to_resync_at(hash, now)?; + return Ok(()); + } + } + Err(Error::Message(format!( + "Block {:?} was not in an errored state", + hash + ))) + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -257,7 +275,7 @@ impl BlockResyncManager { if let Err(e) = &res { manager.metrics.resync_error_counter.add(1); - warn!("Error when resyncing {:?}: {}", hash, e); + error!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.errors.get(hash.as_slice())? { Some(ec) => ErrorCounter::decode(&ec).add1(now + 1), @@ -477,27 +495,22 @@ impl Worker for ResyncWorker { format!("Block resync worker #{}", self.index + 1) } - fn info(&self) -> Option<String> { + fn status(&self) -> WorkerStatus { let persisted = self.manager.resync.persisted.load(); if self.index >= persisted.n_workers { - return Some("(unused)".into()); - } - - let mut ret = vec![]; - ret.push(format!("tranquility = {}", persisted.tranquility)); - - let qlen = self.manager.resync.queue_len().unwrap_or(0); - if qlen > 0 { - ret.push(format!("{} blocks in queue", qlen)); + return WorkerStatus { + freeform: vec!["This worker is currently disabled".into()], + ..Default::default() + }; } - let elen = self.manager.resync.errors_len().unwrap_or(0); - if elen > 0 { - ret.push(format!("{} blocks in error state", elen)); + WorkerStatus { + queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), + tranquility: Some(persisted.tranquility), + persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), + ..Default::default() } - - Some(ret.join(", ")) } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { @@ -545,9 +558,9 @@ impl Worker for ResyncWorker { /// and the time of the last try. /// Used to implement exponential backoff. #[derive(Clone, Copy, Debug)] -struct ErrorCounter { - errors: u64, - last_try: u64, +pub(crate) struct ErrorCounter { + pub(crate) errors: u64, + pub(crate) last_try: u64, } impl ErrorCounter { @@ -558,12 +571,13 @@ impl ErrorCounter { } } - fn decode(data: &[u8]) -> Self { + pub(crate) fn decode(data: &[u8]) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), } } + fn encode(&self) -> Vec<u8> { [ u64::to_be_bytes(self.errors), @@ -583,7 +597,8 @@ impl ErrorCounter { (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER) } - fn next_try(&self) -> u64 { + + pub(crate) fn next_try(&self) -> u64 { self.last_try + self.delay_msec() } } |