diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 94 |
1 files changed, 90 insertions, 4 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 97e06f0e..ec1890bf 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -39,7 +39,7 @@ use crate::garage::Garage; pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; -pub const BACKGROUND_TRANQUILITY: u32 = 3; +pub const BACKGROUND_TRANQUILITY: u32 = 2; // Timeout for RPCs that read and write blocks to remote nodes const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); @@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); // The delay between the time where a resync operation fails -// and the time when it is retried. +// and the time when it is retried, with exponential backoff +// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure). const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); // The delay between the moment when the reference counter @@ -158,6 +159,7 @@ pub struct BlockManager { resync_queue: SledCountedTree, resync_notify: Notify, + resync_errors: SledCountedTree, system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, @@ -187,13 +189,18 @@ impl BlockManager { .expect("Unable to open block_local_resync_queue tree"); let resync_queue = SledCountedTree::new(resync_queue); + let resync_errors = db + .open_tree("block_local_resync_errors") + .expect("Unable to open block_local_resync_errors tree"); + let resync_errors = SledCountedTree::new(resync_errors); + let endpoint = system .netapp .endpoint("garage_model/block.rs/Rpc".to_string()); let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync_queue.clone()); + let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone()); let block_manager = Arc::new(Self { replication, @@ -202,6 +209,7 @@ impl BlockManager { rc, resync_queue, resync_notify: Notify::new(), + resync_errors, system, endpoint, garage: ArcSwapOption::from(None), @@ -519,6 +527,10 @@ impl BlockManager { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { let when = now_msec() + delay.as_millis() as u64; + self.put_to_resync_at(hash, when) + } + + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -560,6 +572,17 @@ impl BlockManager { if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + if let Some(ec) = self.resync_errors.get(hash.as_slice())? { + let ec = ErrorCounter::decode(ec); + if now < ec.next_try() { + // if next retry after an error is not yet, + // don't do resync and return early, but still + // make sure the item is still in queue at expected time + self.put_to_resync_at(&hash, ec.next_try())?; + return Ok(false); + } + } + let tracer = opentelemetry::global::tracer("garage"); let trace_id = gen_uuid(); let span = tracer @@ -584,8 +607,19 @@ impl BlockManager { if let Err(e) = &res { self.metrics.resync_error_counter.add(1); warn!("Error when resyncing {:?}: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?; + + let err_counter = match self.resync_errors.get(hash.as_slice())? { + Some(ec) => ErrorCounter::decode(ec).add1(), + None => ErrorCounter::new(), + }; + + self.put_to_resync_at(&hash, err_counter.next_try())?; + self.resync_errors + .insert(hash.as_slice(), err_counter.encode())?; + } else { + self.resync_errors.remove(hash.as_slice())?; } + Ok(true) } else { self.resync_queue.insert(time_bytes, hash_bytes)?; @@ -994,6 +1028,58 @@ impl RcEntry { } } +/// Counts the number of errors when resyncing a block, +/// and the time of the last try. +/// Used to implement exponential backoff. +#[derive(Clone, Copy, Debug)] +struct ErrorCounter { + errors: u64, + last_try: u64, +} + +impl Default for ErrorCounter { + fn default() -> Self { + Self { + errors: 1, + last_try: now_msec(), + } + } +} + +impl ErrorCounter { + fn new() -> Self { + Self::default() + } + + fn decode(data: sled::IVec) -> Self { + Self { + errors: u64::from_be_bytes(data[0..8].try_into().unwrap()), + last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()), + } + } + fn encode(&self) -> Vec<u8> { + [ + u64::to_be_bytes(self.errors), + u64::to_be_bytes(self.last_try), + ] + .concat() + } + + fn add1(self) -> Self { + Self { + errors: self.errors + 1, + last_try: now_msec(), + } + } + + fn delay_msec(&self) -> u64 { + (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10) + } + fn next_try(&self) -> u64 { + self.last_try + self.delay_msec() + } +} + fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { let mut result = Vec::<u8>::new(); let mut encoder = Encoder::new(&mut result, level)?; |