diff options
-rw-r--r-- | src/model/block.rs | 94 | ||||
-rw-r--r-- | src/model/block_metrics.rs | 9 | ||||
-rw-r--r-- | src/util/sled_counter.rs | 8 |
3 files changed, 106 insertions, 5 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 97e06f0e..ec1890bf 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -39,7 +39,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; -pub const BACKGROUND_TRANQUILITY: u32 = 3; +pub const BACKGROUND_TRANQUILITY: u32 = 2; // Timeout for RPCs that read and write blocks to remote nodes const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); @@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); // The delay between the time where a resync operation fails -// and the time when it is retried. +// and the time when it is retried, with exponential backoff +// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure). const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); // The delay between the moment when the reference counter @@ -158,6 +159,7 @@ pub struct BlockManager { resync_queue: SledCountedTree, resync_notify: Notify, + resync_errors: SledCountedTree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -187,13 +189,18 @@ impl BlockManager { .expect("Unable to open block_local_resync_queue tree"); let resync_queue = SledCountedTree::new(resync_queue); + let resync_errors = db + .open_tree("block_local_resync_errors") + .expect("Unable to open block_local_resync_errors tree"); + let resync_errors = SledCountedTree::new(resync_errors); + let endpoint = system .netapp .endpoint("garage_model/block.rs/Rpc".to_string()); let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync_queue.clone()); + let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone()); let block_manager = Arc::new(Self { replication, @@ -202,6 +209,7 @@ impl BlockManager { rc, resync_queue, resync_notify: Notify::new(), + resync_errors, system, endpoint, garage: ArcSwapOption::from(None), @@ -519,6 +527,10 @@ impl BlockManager { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { let when = now_msec() + delay.as_millis() as u64; + self.put_to_resync_at(hash, when) + } + + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -560,6 +572,17 @@ impl BlockManager { if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + if let Some(ec) = self.resync_errors.get(hash.as_slice())? { + let ec = ErrorCounter::decode(ec); + if now < ec.next_try() { + // if next retry after an error is not yet, + // don't do resync and return early, but still + // make sure the item is still in queue at expected time + self.put_to_resync_at(&hash, ec.next_try())?; + return Ok(false); + } + } + let tracer = opentelemetry::global::tracer("garage"); let trace_id = gen_uuid(); let span = tracer @@ -584,8 +607,19 @@ impl BlockManager { if let Err(e) = &res { self.metrics.resync_error_counter.add(1); warn!("Error when resyncing {:?}: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; + + let err_counter = match self.resync_errors.get(hash.as_slice())? { + Some(ec) => ErrorCounter::decode(ec).add1(), + None => ErrorCounter::new(), + }; + + self.put_to_resync_at(&hash, err_counter.next_try())?; + self.resync_errors + .insert(hash.as_slice(), err_counter.encode())?; + } else { + self.resync_errors.remove(hash.as_slice())?; } + Ok(true) } else { self.resync_queue.insert(time_bytes, hash_bytes)?; @@ -994,6 +1028,58 @@ impl RcEntry { } } +/// Counts the number of errors when resyncing a block, +/// and the time of the last try. +/// Used to implement exponential backoff. +#[derive(Clone, Copy, Debug)] +struct ErrorCounter { + errors: u64, + last_try: u64, +} + +impl Default for ErrorCounter { + fn default() -> Self { + Self { + errors: 1, + last_try: now_msec(), + } + } +} + +impl ErrorCounter { + fn new() -> Self { + Self::default() + } + + fn decode(data: sled::IVec) -> Self { + Self { + errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), + last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), + } + } + fn encode(&self) -> Vec<u8> { + [ + u64::to_be_bytes(self.errors), + u64::to_be_bytes(self.last_try), + ] + .concat() + } + + fn add1(self) -> Self { + Self { + errors: self.errors + 1, + last_try: now_msec(), + } + } + + fn delay_msec(&self) -> u64 { + (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10) + } + fn next_try(&self) -> u64 { + self.last_try + self.delay_msec() + } +} + fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { let mut result = Vec::<u8>::new(); let mut encoder = Encoder::new(&mut result, level)?; diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs index 819af241..f0f541a3 100644 --- a/src/model/block_metrics.rs +++ b/src/model/block_metrics.rs @@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { pub(crate) _resync_queue_len: ValueObserver<u64>, + pub(crate) _resync_errored_blocks: ValueObserver<u64>, pub(crate) resync_counter: BoundCounter<u64>, pub(crate) resync_error_counter: BoundCounter<u64>, @@ -22,7 +23,7 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(resync_queue: SledCountedTree) -> Self { + pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self { let meter = global::meter("garage_model/block"); Self { _resync_queue_len: meter @@ -33,6 +34,12 @@ impl BlockManagerMetrics { "Number of block hashes queued for local check and possible resync", ) .init(), + _resync_errored_blocks: meter + .u64_value_observer("block.resync_errored_blocks", move |observer| { + observer.observe(resync_errors.len() as u64, &[]) + }) + .with_description("Number of block hashes whose last resync resulted in an error") + .init(), resync_counter: meter .u64_counter("block.resync_counter") diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs index 8af04f50..bc54cea0 100644 --- a/src/util/sled_counter.rs +++ b/src/util/sled_counter.rs @@ -52,6 +52,14 @@ impl SledCountedTree { res } + pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { + let res = self.0.tree.remove(key); + if matches!(res, Ok(Some(_))) { + self.0.len.fetch_sub(1, Ordering::Relaxed); + } + res + } + pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { let res = self.0.tree.pop_min(); if let Ok(Some(_)) = &res { |