aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs94
1 files changed, 90 insertions, 4 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 97e06f0e..ec1890bf 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -39,7 +39,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
-pub const BACKGROUND_TRANQUILITY: u32 = 3;
+pub const BACKGROUND_TRANQUILITY: u32 = 2;
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails
-// and the time when it is retried.
+// and the time when it is retried, with exponential backoff
+// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
@@ -158,6 +159,7 @@ pub struct BlockManager {
resync_queue: SledCountedTree,
resync_notify: Notify,
+ resync_errors: SledCountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -187,13 +189,18 @@ impl BlockManager {
.expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
+ let resync_errors = db
+ .open_tree("block_local_resync_errors")
+ .expect("Unable to open block_local_resync_errors tree");
+ let resync_errors = SledCountedTree::new(resync_errors);
+
let endpoint = system
.netapp
.endpoint("garage_model/block.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked();
- let metrics = BlockManagerMetrics::new(resync_queue.clone());
+ let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self {
replication,
@@ -202,6 +209,7 @@ impl BlockManager {
rc,
resync_queue,
resync_notify: Notify::new(),
+ resync_errors,
system,
endpoint,
garage: ArcSwapOption::from(None),
@@ -519,6 +527,10 @@ impl BlockManager {
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64;
+ self.put_to_resync_at(hash, when)
+ }
+
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -560,6 +572,17 @@ impl BlockManager {
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
+ if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
+ let ec = ErrorCounter::decode(ec);
+ if now < ec.next_try() {
+ // if next retry after an error is not yet,
+ // don't do resync and return early, but still
+ // make sure the item is still in queue at expected time
+ self.put_to_resync_at(&hash, ec.next_try())?;
+ return Ok(false);
+ }
+ }
+
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
@@ -584,8 +607,19 @@ impl BlockManager {
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
+
+ let err_counter = match self.resync_errors.get(hash.as_slice())? {
+ Some(ec) => ErrorCounter::decode(ec).add1(),
+ None => ErrorCounter::new(),
+ };
+
+ self.put_to_resync_at(&hash, err_counter.next_try())?;
+ self.resync_errors
+ .insert(hash.as_slice(), err_counter.encode())?;
+ } else {
+ self.resync_errors.remove(hash.as_slice())?;
}
+
Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
@@ -994,6 +1028,58 @@ impl RcEntry {
}
}
+/// Counts the number of errors when resyncing a block,
+/// and the time of the last try.
+/// Used to implement exponential backoff.
+#[derive(Clone, Copy, Debug)]
+struct ErrorCounter {
+ errors: u64,
+ last_try: u64,
+}
+
+impl Default for ErrorCounter {
+ fn default() -> Self {
+ Self {
+ errors: 1,
+ last_try: now_msec(),
+ }
+ }
+}
+
+impl ErrorCounter {
+ fn new() -> Self {
+ Self::default()
+ }
+
+ fn decode(data: sled::IVec) -> Self {
+ Self {
+ errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
+ last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
+ }
+ }
+ fn encode(&self) -> Vec<u8> {
+ [
+ u64::to_be_bytes(self.errors),
+ u64::to_be_bytes(self.last_try),
+ ]
+ .concat()
+ }
+
+ fn add1(self) -> Self {
+ Self {
+ errors: self.errors + 1,
+ last_try: now_msec(),
+ }
+ }
+
+ fn delay_msec(&self) -> u64 {
+ (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
+ }
+ fn next_try(&self) -> u64 {
+ self.last_try + self.delay_msec()
+ }
+}
+
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;