From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 169 +++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 68 deletions(-) (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 36e8172b..26833390 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -2,8 +2,8 @@ use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; @@ -11,6 +11,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -171,11 +172,13 @@ impl IndexCounter { ), }); - let this2 = this.clone(); - background.spawn_worker( - format!("{} index counter propagator", T::COUNTER_TABLE_NAME), - move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), - ); + background.spawn_worker(IndexPropagatorWorker { + index_counter: this.clone(), + propagate_rx, + buf: HashMap::new(), + errors: 0, + }); + this } @@ -239,68 +242,6 @@ impl IndexCounter { Ok(()) } - async fn propagate_loop( - self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, - must_exit: watch::Receiver, - ) { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let mut buf = HashMap::new(); - let mut errors = 0; - - loop { - let (ent, closed) = match propagate_rx.try_recv() { - Ok(ent) => (Some(ent), false), - Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { - match propagate_rx.recv().await { - Some(ent) => (Some(ent), false), - None => (None, true), - } - } - Err(mpsc::error::TryRecvError::Empty) => (None, false), - Err(mpsc::error::TryRecvError::Disconnected) => (None, true), - }; - - if let Some((pk, sk, counters)) = ent { - let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.this_node); - match buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - // As long as we can add entries, loop back and add them to batch - // before sending batch to other nodes - continue; - } - - if !buf.is_empty() { - let entries = buf.iter().map(|(_k, v)| v); - if let Err(e) = self.table.insert_many(entries).await { - errors += 1; - if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); - break; - } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - - buf.clear(); - errors = 0; - } - - if closed || *must_exit.borrow() { - break; - } - } - } - pub fn offline_recount_all( &self, counted_table: &Arc>, @@ -437,6 +378,98 @@ impl IndexCounter { } } +struct IndexPropagatorWorker { + index_counter: Arc>, + propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + + buf: HashMap, CounterEntry>, + errors: usize, +} + +impl IndexPropagatorWorker { + fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry) { + let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry(self.index_counter.this_node); + match self.buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + } +} + +#[async_trait] +impl Worker for IndexPropagatorWorker { + fn name(&self) -> String { + format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + } + + fn info(&self) -> Option { + if !self.buf.is_empty() { + Some(format!("{} items in queue", self.buf.len())) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let closed = loop { + match self.propagate_rx.try_recv() { + Ok((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + } + Err(mpsc::error::TryRecvError::Empty) => break false, + Err(mpsc::error::TryRecvError::Disconnected) => break true, + } + }; + + if !self.buf.is_empty() { + let entries_k = self.buf.keys().take(100).cloned().collect::>(); + let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); + if let Err(e) = self.index_counter.table.insert_many(entries).await { + self.errors += 1; + if self.errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); + return Ok(WorkerState::Done); + } + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); + } else { + for k in entries_k { + self.buf.remove(&k); + } + self.errors = 0; + } + + return Ok(WorkerState::Busy); + } else if closed { + return Ok(WorkerState::Done); + } else { + return Ok(WorkerState::Idle); + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + match self.propagate_rx.recv().await { + Some((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + WorkerState::Busy + } + None => match self.buf.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Done, + }, + } + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { pk: T::CP, -- cgit v1.2.3