diff options
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r-- | src/block/resync.rs | 166 |
1 files changed, 95 insertions, 71 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs index ada3ac54..ea280ad4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -22,7 +21,7 @@ use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2; pub struct BlockResyncManager { pub(crate) queue: CountedTree, - pub(crate) notify: Notify, + pub(crate) notify: Arc<Notify>, pub(crate) errors: CountedTree, busy_set: BusySet, - persister: Persister<ResyncPersistedConfig>, - persisted: ArcSwap<ResyncPersistedConfig>, + persister: PersisterShared<ResyncPersistedConfig>, } #[derive(Serialize, Deserialize, Clone, Copy)] @@ -63,6 +61,15 @@ struct ResyncPersistedConfig { n_workers: usize, tranquility: u32, } +impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} +impl Default for ResyncPersistedConfig { + fn default() -> Self { + ResyncPersistedConfig { + n_workers: 1, + tranquility: INITIAL_RESYNC_TRANQUILITY, + } + } +} enum ResyncIterResult { BusyDidSomething, @@ -90,22 +97,14 @@ impl BlockResyncManager { .expect("Unable to open block_local_resync_errors tree"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); - let persister = Persister::new(&system.metadata_dir, "resync_cfg"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ResyncPersistedConfig { - n_workers: 1, - tranquility: INITIAL_RESYNC_TRANQUILITY, - }, - }; + let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg"); Self { queue, - notify: Notify::new(), + notify: Arc::new(Notify::new()), errors, busy_set: Arc::new(Mutex::new(HashSet::new())), persister, - persisted: ArcSwap::new(Arc::new(persisted)), } } @@ -123,6 +122,56 @@ impl BlockResyncManager { Ok(self.errors.len()) } + /// Clear the error counter for a block and put it in queue immediately + pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> { + let now = now_msec(); + if let Some(ec) = self.errors.get(hash)? { + let mut ec = ErrorCounter::decode(&ec); + if ec.errors > 0 { + ec.last_try = now - ec.delay_msec(); + self.errors.insert(hash, ec.encode())?; + self.put_to_resync_at(hash, now)?; + return Ok(()); + } + } + Err(Error::Message(format!( + "Block {:?} was not in an errored state", + hash + ))) + } + + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-worker-count", + |p| p.get_with(|x| x.n_workers), + move |p, n_workers| { + if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + p.set_with(|x| x.n_workers = n_workers)?; + notify.notify_waiters(); + Ok(()) + }, + ); + + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-tranquility", + |p| p.get_with(|x| x.tranquility), + move |p, tranquility| { + p.set_with(|x| x.tranquility = tranquility)?; + notify.notify_waiters(); + Ok(()) + }, + ); + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -257,7 +306,7 @@ impl BlockResyncManager { if let Err(e) = &res { manager.metrics.resync_error_counter.add(1); - warn!("Error when resyncing {:?}: {}", hash, e); + error!("Error when resyncing {:?}: {}", hash, e); let err_counter = match self.errors.get(hash.as_slice())? { Some(ec) => ErrorCounter::decode(&ec).add1(now + 1), @@ -417,33 +466,6 @@ impl BlockResyncManager { Ok(()) } - - async fn update_persisted( - &self, - update: impl Fn(&mut ResyncPersistedConfig), - ) -> Result<(), Error> { - let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref(); - update(&mut cfg); - self.persister.save_async(&cfg).await?; - self.persisted.store(Arc::new(cfg)); - self.notify.notify_waiters(); - Ok(()) - } - - pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { - if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { - return Err(Error::Message(format!( - "Invalid number of resync workers, must be between 1 and {}", - MAX_RESYNC_WORKERS - ))); - } - self.update_persisted(|cfg| cfg.n_workers = n_workers).await - } - - pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { - self.update_persisted(|cfg| cfg.tranquility = tranquility) - .await - } } impl Drop for BusyBlock { @@ -458,15 +480,18 @@ pub(crate) struct ResyncWorker { manager: Arc<BlockManager>, tranquilizer: Tranquilizer, next_delay: Duration, + persister: PersisterShared<ResyncPersistedConfig>, } impl ResyncWorker { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self { + let persister = manager.resync.persister.clone(); Self { index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), + persister, } } } @@ -477,39 +502,36 @@ impl Worker for ResyncWorker { format!("Block resync worker #{}", self.index + 1) } - fn info(&self) -> Option<String> { - let persisted = self.manager.resync.persisted.load(); + fn status(&self) -> WorkerStatus { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); - if self.index >= persisted.n_workers { - return Some("(unused)".into()); + if self.index >= n_workers { + return WorkerStatus { + freeform: vec!["This worker is currently disabled".into()], + ..Default::default() + }; } - let mut ret = vec![]; - ret.push(format!("tranquility = {}", persisted.tranquility)); - - let qlen = self.manager.resync.queue_len().unwrap_or(0); - if qlen > 0 { - ret.push(format!("{} blocks in queue", qlen)); - } - - let elen = self.manager.resync.errors_len().unwrap_or(0); - if elen > 0 { - ret.push(format!("{} blocks in error state", elen)); + WorkerStatus { + queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), + tranquility: Some(tranquility), + persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), + ..Default::default() } - - Some(ret.join(", ")) } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - if self.index >= self.manager.resync.persisted.load().n_workers { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); + + if self.index >= n_workers { return Ok(WorkerState::Idle); } self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { - Ok(ResyncIterResult::BusyDidSomething) => Ok(self - .tranquilizer - .tranquilize_worker(self.manager.resync.persisted.load().tranquility)), + Ok(ResyncIterResult::BusyDidSomething) => { + Ok(self.tranquilizer.tranquilize_worker(tranquility)) + } Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -527,8 +549,8 @@ impl Worker for ResyncWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { - while self.index >= self.manager.resync.persisted.load().n_workers { + async fn wait_for_work(&mut self) -> WorkerState { + while self.index >= self.persister.get_with(|x| x.n_workers) { self.manager.resync.notify.notified().await } @@ -545,9 +567,9 @@ impl Worker for ResyncWorker { /// and the time of the last try. /// Used to implement exponential backoff. #[derive(Clone, Copy, Debug)] -struct ErrorCounter { - errors: u64, - last_try: u64, +pub(crate) struct ErrorCounter { + pub(crate) errors: u64, + pub(crate) last_try: u64, } impl ErrorCounter { @@ -558,12 +580,13 @@ impl ErrorCounter { } } - fn decode(data: &[u8]) -> Self { + pub(crate) fn decode(data: &[u8]) -> Self { Self { errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), } } + fn encode(&self) -> Vec<u8> { [ u64::to_be_bytes(self.errors), @@ -583,7 +606,8 @@ impl ErrorCounter { (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER) } - fn next_try(&self) -> u64 { + + pub(crate) fn next_try(&self) -> u64 { self.last_try + self.delay_msec() } } |