aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/model/block.rs94
-rw-r--r--src/model/block_metrics.rs9
-rw-r--r--src/util/sled_counter.rs8
3 files changed, 106 insertions, 5 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 97e06f0e..ec1890bf 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -39,7 +39,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
-pub const BACKGROUND_TRANQUILITY: u32 = 3;
+pub const BACKGROUND_TRANQUILITY: u32 = 2;
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails
-// and the time when it is retried.
+// and the time when it is retried, with exponential backoff
+// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
@@ -158,6 +159,7 @@ pub struct BlockManager {
resync_queue: SledCountedTree,
resync_notify: Notify,
+ resync_errors: SledCountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -187,13 +189,18 @@ impl BlockManager {
.expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
+ let resync_errors = db
+ .open_tree("block_local_resync_errors")
+ .expect("Unable to open block_local_resync_errors tree");
+ let resync_errors = SledCountedTree::new(resync_errors);
+
let endpoint = system
.netapp
.endpoint("garage_model/block.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked();
- let metrics = BlockManagerMetrics::new(resync_queue.clone());
+ let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self {
replication,
@@ -202,6 +209,7 @@ impl BlockManager {
rc,
resync_queue,
resync_notify: Notify::new(),
+ resync_errors,
system,
endpoint,
garage: ArcSwapOption::from(None),
@@ -519,6 +527,10 @@ impl BlockManager {
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64;
+ self.put_to_resync_at(hash, when)
+ }
+
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -560,6 +572,17 @@ impl BlockManager {
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
+ if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
+ let ec = ErrorCounter::decode(ec);
+ if now < ec.next_try() {
+ // if next retry after an error is not yet,
+ // don't do resync and return early, but still
+ // make sure the item is still in queue at expected time
+ self.put_to_resync_at(&hash, ec.next_try())?;
+ return Ok(false);
+ }
+ }
+
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
@@ -584,8 +607,19 @@ impl BlockManager {
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
+
+ let err_counter = match self.resync_errors.get(hash.as_slice())? {
+ Some(ec) => ErrorCounter::decode(ec).add1(),
+ None => ErrorCounter::new(),
+ };
+
+ self.put_to_resync_at(&hash, err_counter.next_try())?;
+ self.resync_errors
+ .insert(hash.as_slice(), err_counter.encode())?;
+ } else {
+ self.resync_errors.remove(hash.as_slice())?;
}
+
Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
@@ -994,6 +1028,58 @@ impl RcEntry {
}
}
+/// Counts the number of errors when resyncing a block,
+/// and the time of the last try.
+/// Used to implement exponential backoff.
+#[derive(Clone, Copy, Debug)]
+struct ErrorCounter {
+ errors: u64,
+ last_try: u64,
+}
+
+impl Default for ErrorCounter {
+ fn default() -> Self {
+ Self {
+ errors: 1,
+ last_try: now_msec(),
+ }
+ }
+}
+
+impl ErrorCounter {
+ fn new() -> Self {
+ Self::default()
+ }
+
+ fn decode(data: sled::IVec) -> Self {
+ Self {
+ errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
+ last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
+ }
+ }
+ fn encode(&self) -> Vec<u8> {
+ [
+ u64::to_be_bytes(self.errors),
+ u64::to_be_bytes(self.last_try),
+ ]
+ .concat()
+ }
+
+ fn add1(self) -> Self {
+ Self {
+ errors: self.errors + 1,
+ last_try: now_msec(),
+ }
+ }
+
+ fn delay_msec(&self) -> u64 {
+ (RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
+ }
+ fn next_try(&self) -> u64 {
+ self.last_try + self.delay_msec()
+ }
+}
+
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
diff --git a/src/model/block_metrics.rs b/src/model/block_metrics.rs
index 819af241..f0f541a3 100644
--- a/src/model/block_metrics.rs
+++ b/src/model/block_metrics.rs
@@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>,
+ pub(crate) _resync_errored_blocks: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -22,7 +23,7 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(resync_queue: SledCountedTree) -> Self {
+ pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
@@ -33,6 +34,12 @@ impl BlockManagerMetrics {
"Number of block hashes queued for local check and possible resync",
)
.init(),
+ _resync_errored_blocks: meter
+ .u64_value_observer("block.resync_errored_blocks", move |observer| {
+ observer.observe(resync_errors.len() as u64, &[])
+ })
+ .with_description("Number of block hashes whose last resync resulted in an error")
+ .init(),
resync_counter: meter
.u64_counter("block.resync_counter")
diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs
index 8af04f50..bc54cea0 100644
--- a/src/util/sled_counter.rs
+++ b/src/util/sled_counter.rs
@@ -52,6 +52,14 @@ impl SledCountedTree {
res
}
+ pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
+ let res = self.0.tree.remove(key);
+ if matches!(res, Ok(Some(_))) {
+ self.0.len.fetch_sub(1, Ordering::Relaxed);
+ }
+ res
+ }
+
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {