aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/index_counter.rs169
1 files changed, 101 insertions, 68 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 36e8172b..26833390 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -2,8 +2,8 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
@@ -11,6 +11,7 @@ use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -171,11 +172,13 @@ impl<T: CountedItem> IndexCounter<T> {
),
});
- let this2 = this.clone();
- background.spawn_worker(
- format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
- move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
- );
+ background.spawn_worker(IndexPropagatorWorker {
+ index_counter: this.clone(),
+ propagate_rx,
+ buf: HashMap::new(),
+ errors: 0,
+ });
+
this
}
@@ -239,68 +242,6 @@ impl<T: CountedItem> IndexCounter<T> {
Ok(())
}
- async fn propagate_loop(
- self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
- must_exit: watch::Receiver<bool>,
- ) {
- // This loop batches updates to counters to be sent all at once.
- // They are sent once the propagate_rx channel has been emptied (or is closed).
- let mut buf = HashMap::new();
- let mut errors = 0;
-
- loop {
- let (ent, closed) = match propagate_rx.try_recv() {
- Ok(ent) => (Some(ent), false),
- Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
- match propagate_rx.recv().await {
- Some(ent) => (Some(ent), false),
- None => (None, true),
- }
- }
- Err(mpsc::error::TryRecvError::Empty) => (None, false),
- Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
- };
-
- if let Some((pk, sk, counters)) = ent {
- let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry(self.this_node);
- match buf.entry(tree_key) {
- hash_map::Entry::Vacant(e) => {
- e.insert(dist_entry);
- }
- hash_map::Entry::Occupied(mut e) => {
- e.get_mut().merge(&dist_entry);
- }
- }
- // As long as we can add entries, loop back and add them to batch
- // before sending batch to other nodes
- continue;
- }
-
- if !buf.is_empty() {
- let entries = buf.iter().map(|(_k, v)| v);
- if let Err(e) = self.table.insert_many(entries).await {
- errors += 1;
- if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
- break;
- }
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
- tokio::time::sleep(Duration::from_secs(5)).await;
- continue;
- }
-
- buf.clear();
- errors = 0;
- }
-
- if closed || *must_exit.borrow() {
- break;
- }
- }
- }
-
pub fn offline_recount_all<TS, TR>(
&self,
counted_table: &Arc<Table<TS, TR>>,
@@ -437,6 +378,98 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
+struct IndexPropagatorWorker<T: CountedItem> {
+ index_counter: Arc<IndexCounter<T>>,
+ propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
+
+ buf: HashMap<Vec<u8>, CounterEntry<T>>,
+ errors: usize,
+}
+
+impl<T: CountedItem> IndexPropagatorWorker<T> {
+ fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
+ let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
+ match self.buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
+ fn name(&self) -> String {
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ if !self.buf.is_empty() {
+ Some(format!("{} items in queue", self.buf.len()))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let closed = loop {
+ match self.propagate_rx.try_recv() {
+ Ok((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ }
+ Err(mpsc::error::TryRecvError::Empty) => break false,
+ Err(mpsc::error::TryRecvError::Disconnected) => break true,
+ }
+ };
+
+ if !self.buf.is_empty() {
+ let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
+ let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
+ if let Err(e) = self.index_counter.table.insert_many(entries).await {
+ self.errors += 1;
+ if self.errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
+ return Ok(WorkerState::Done);
+ }
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
+ } else {
+ for k in entries_k {
+ self.buf.remove(&k);
+ }
+ self.errors = 0;
+ }
+
+ return Ok(WorkerState::Busy);
+ } else if closed {
+ return Ok(WorkerState::Done);
+ } else {
+ return Ok(WorkerState::Idle);
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ match self.propagate_rx.recv().await {
+ Some((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ WorkerState::Busy
+ }
+ None => match self.buf.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Done,
+ },
+ }
+ }
+}
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,