From f8e528c15de0c9d31c16e5cd8e58f99f4132f103 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 10:48:49 +0100 Subject: Small refactor of tables internals --- src/model/index_counter.rs | 6 +----- src/model/k2v/rpc.rs | 7 +------ src/table/data.rs | 40 ++++++++++++++++++++++------------------ 3 files changed, 24 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index b9594406..bcf55942 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -251,13 +251,9 @@ impl IndexCounter { TR: TableReplication, { let save_counter_entry = |entry: CounterEntry| -> Result<(), Error> { - let entry_k = self - .table - .data - .tree_key(entry.partition_key(), entry.sort_key()); self.table .data - .update_entry_with(&entry_k, |ent| match ent { + .update_entry_with(&entry.partition_key(), &entry.sort_key(), |ent| match ent { Some(mut ent) => { ent.merge(&entry); ent diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a74df277..f64a7984 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -273,14 +273,9 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result, Error> { - let tree_key = self - .item_table - .data - .tree_key(&item.partition, &item.sort_key); - self.item_table .data - .update_entry_with(&tree_key[..], |ent| { + .update_entry_with(&item.partition, &item.sort_key, |ent| { let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, diff --git a/src/table/data.rs b/src/table/data.rs index 93da2110..cae18999 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -173,9 +173,8 @@ where pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - self.update_entry_with(&tree_key[..], |ent| match ent { + self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { Some(mut ent) => { ent.merge(&update); ent @@ -187,11 +186,14 @@ where pub fn update_entry_with( &self, - tree_key: &[u8], + partition_key: &F::P, + sort_key: &F::S, f: impl Fn(Option) -> F::E, ) -> Result, Error> { + let tree_key = self.tree_key(partition_key, sort_key); + let changed = self.store.db().transaction(|mut tx| { - let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); @@ -200,23 +202,23 @@ where None => (None, None, f(None)), }; - // Scenario 1: the value changed, so of course there is a change - let value_changed = Some(&new_entry) != old_entry.as_ref(); - + // Changed can be true in two scenarios + // Scenario 1: the actual represented value changed, + // so of course the messagepack encoding changed as well // Scenario 2: the value didn't change but due to a migration in the - // data format, the messagepack encoding changed. In this case - // we have to write the migrated value in the table and update - // the associated Merkle tree entry. + // data format, the messagepack encoding changed. In this case, + // we also have to write the migrated value in the table and update + // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; - let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); drop(old_bytes); - if value_changed || encoding_changed { - let new_bytes_hash = blake2sum(&new_bytes[..]); - tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; - tx.insert(&self.store, tree_key, new_bytes)?; + if changed { + let new_bytes_hash = blake2sum(&new_bytes); + tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, &tree_key, new_bytes)?; self.instance .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; @@ -242,7 +244,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } @@ -258,10 +260,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } @@ -285,10 +288,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } -- cgit v1.2.3 From 83c8467e23c1f531ae233766d5dc7244afe57f08 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 11:58:06 +0100 Subject: Proper queueing for delayed inserts, now backed to disk --- src/model/index_counter.rs | 144 ++++-------------------------------------- src/model/s3/object_table.rs | 50 +++++++-------- src/model/s3/version_table.rs | 37 +++++------ src/table/data.rs | 38 ++++++++++- src/table/lib.rs | 8 ++- src/table/merkle.rs | 6 +- src/table/queue.rs | 84 ++++++++++++++++++++++++ src/table/table.rs | 11 ++++ 8 files changed, 194 insertions(+), 184 deletions(-) create mode 100644 src/table/queue.rs (limited to 'src') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index bcf55942..9c8e00c2 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,17 +1,14 @@ use core::ops::Bound; -use std::collections::{hash_map, BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch}; use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; -use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -142,7 +139,6 @@ impl TableSchema for CounterTable { pub struct IndexCounter { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } @@ -152,16 +148,11 @@ impl IndexCounter { replication: TableShardedReplication, db: &db::Db, ) -> Arc { - let background = system.background.clone(); - - let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); - - let this = Arc::new(Self { + Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), - propagate_tx, table: Table::new( CounterTable { _phantom_t: Default::default(), @@ -170,16 +161,7 @@ impl IndexCounter { system, db, ), - }); - - background.spawn_worker(IndexPropagatorWorker { - index_counter: this.clone(), - propagate_rx, - buf: HashMap::new(), - errors: 0, - }); - - this + }) } pub fn count( @@ -232,12 +214,8 @@ impl IndexCounter { .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { - error!( - "Could not propagate updated counter values, failed to send to channel: {}", - e - ); - } + let dist_entry = entry.into_counter_entry(self.this_node); + self.table.queue_insert(tx, &dist_entry)?; Ok(()) } @@ -250,19 +228,6 @@ impl IndexCounter { TS: TableSchema, TR: TableReplication, { - let save_counter_entry = |entry: CounterEntry| -> Result<(), Error> { - self.table - .data - .update_entry_with(&entry.partition_key(), &entry.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&entry); - ent - } - None => entry.clone(), - })?; - Ok(()) - }; - // 1. Set all old local counters to zero let now = now_msec(); let mut next_start: Option> = None; @@ -298,7 +263,9 @@ impl IndexCounter { .insert(&local_counter_k, &local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(local_counter_k); } @@ -363,7 +330,9 @@ impl IndexCounter { .insert(&local_counter_key, local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(counted_entry_k); } @@ -374,96 +343,7 @@ impl IndexCounter { } } -struct IndexPropagatorWorker { - index_counter: Arc>, - propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, - - buf: HashMap, CounterEntry>, - errors: usize, -} - -impl IndexPropagatorWorker { - fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry) { - let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.index_counter.this_node); - match self.buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - } -} - -#[async_trait] -impl Worker for IndexPropagatorWorker { - fn name(&self) -> String { - format!("{} counter", T::COUNTER_TABLE_NAME) - } - - fn status(&self) -> WorkerStatus { - WorkerStatus { - queue_length: Some(self.buf.len() as u64), - ..Default::default() - } - } - - async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let closed = loop { - match self.propagate_rx.try_recv() { - Ok((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - } - Err(mpsc::error::TryRecvError::Empty) => break false, - Err(mpsc::error::TryRecvError::Disconnected) => break true, - } - }; - - if !self.buf.is_empty() { - let entries_k = self.buf.keys().take(100).cloned().collect::>(); - let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); - if let Err(e) = self.index_counter.table.insert_many(entries).await { - self.errors += 1; - if self.errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); - return Ok(WorkerState::Done); - } - // Propagate error up to worker manager, it will log it, increment a counter, - // and sleep for a certain delay (with exponential backoff), waiting for - // things to go back to normal - return Err(e); - } else { - for k in entries_k { - self.buf.remove(&k); - } - self.errors = 0; - } - - return Ok(WorkerState::Busy); - } else if closed { - return Ok(WorkerState::Done); - } else { - return Ok(WorkerState::Idle); - } - } - - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { - match self.propagate_rx.recv().await { - Some((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - WorkerState::Busy - } - None => match self.buf.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Done, - }, - } - } -} +// ---- #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 26ff57f6..05b27fb4 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -255,34 +255,34 @@ impl TableSchema for ObjectTable { ); } - // 2. Spawn threads that propagates deletions to version table - let version_table = self.version_table.clone(); - let old = old.cloned(); - let new = new.cloned(); - - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted - } - }; - if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - version_table.insert(&deleted_version).await?; + // 2. Enqueue propagation deletions to version table + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = + Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.version_table.queue_insert(tx, &deleted_version); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue version deletion propagation: {}. A repair will be needed.", + e + ); } } } - Ok(()) - }); + } + Ok(()) } diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6bc2ecd1..0cfaa954 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -141,33 +141,26 @@ impl TableSchema for VersionTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { - let block_ref_table = self.block_ref_table.clone(); - let old = old.cloned(); - let new = new.cloned(); - - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted.get() && !old_v.deleted.get() { - let deleted_block_refs = old_v - .blocks - .items() - .iter() - .map(|(_k, vb)| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true.into(), - }) - .collect::>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted.get() && !old_v.deleted.get() { + let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true.into(), + }); + for block_ref in deleted_block_refs { + let res = self.block_ref_table.queue_insert(tx, &block_ref); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e); + } } } - Ok(()) - }); + } Ok(()) } diff --git a/src/table/data.rs b/src/table/data.rs index cae18999..e3b8c93f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -306,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)? + } + None => rmp_to_vec_all_named(ins) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?, + }; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { diff --git a/src/table/lib.rs b/src/table/lib.rs index b0153e9a..fdf114a6 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -4,16 +4,18 @@ #[macro_use] extern crate tracing; -mod metrics; pub mod schema; pub mod util; pub mod data; +pub mod replication; +pub mod table; + mod gc; mod merkle; -pub mod replication; +mod metrics; +mod queue; mod sync; -pub mod table; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e977bfb5..bcf9f9d7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; @@ -343,7 +344,10 @@ where if *must_exit.borrow() { return WorkerState::Done; } - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), + } WorkerState::Busy } } diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..3671ea7d --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker(pub(crate) Arc>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl Worker for InsertQueueWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} diff --git a/src/table/table.rs b/src/table/table.rs index 8a66c420..c8e0576e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -25,6 +25,7 @@ use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; +use crate::queue::InsertQueueWorker; use crate::replication::*; use crate::schema::*; use crate::sync::*; @@ -88,6 +89,11 @@ where endpoint, }); + table + .system + .background + .spawn_worker(InsertQueueWorker(table.clone())); + table.endpoint.set_handler(table.clone()); table @@ -128,6 +134,11 @@ where Ok(()) } + /// Insert item locally + pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> { + self.data.queue_insert(tx, e) + } + pub async fn insert_many(&self, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, -- cgit v1.2.3 From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/block/manager.rs | 31 +++++++++++++++++++------------ src/garage/admin.rs | 4 ++-- src/garage/repair/online.rs | 15 ++++++++------- src/garage/server.rs | 3 +++ src/model/garage.rs | 21 +++++++++++++++++++++ src/model/index_counter.rs | 4 ++++ src/table/Cargo.toml | 1 + src/table/gc.rs | 13 ++++++++----- src/table/merkle.rs | 12 ++++++------ src/table/sync.rs | 43 +++++++++++++++++++++++++------------------ src/table/table.rs | 22 ++++++++++++++-------- 11 files changed, 111 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/block/manager.rs b/src/block/manager.rs index 28523a93..ffb9de9a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -87,7 +88,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, - tx_scrub_command: mpsc::Sender, + tx_scrub_command: ArcSwapOption>, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -126,8 +127,6 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); - let (scrub_tx, scrub_rx) = mpsc::channel(1); - let block_manager = Arc::new(Self { replication, data_dir, @@ -138,21 +137,26 @@ impl BlockManager { system, endpoint, metrics, - tx_scrub_command: scrub_tx, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager + } + + pub fn spawn_workers(self: &Arc) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { - let worker = ResyncWorker::new(index, block_manager.clone()); - block_manager.system.background.spawn_worker(worker); + let worker = ResyncWorker::new(index, self.clone()); + self.system.background.spawn_worker(worker); } // Spawn scrub worker - let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); - block_manager.system.background.spawn_worker(scrub_worker); - - block_manager + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + self.system + .background + .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it @@ -325,8 +329,11 @@ impl BlockManager { } /// Send command to start/stop/manager scrub worker - pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { - let _ = self.tx_scrub_command.send(cmd).await; + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> { + let tx = self.tx_scrub_command.load(); + let tx = tx.as_ref().ok_or_message("scrub worker is not running")?; + tx.send(cmd).await.ok_or_message("send error")?; + Ok(()) } /// Get the reference count of a block diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 1ca3698a..96d838d5 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -759,7 +759,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt).await; + launch_online_repair(self.garage.clone(), opt).await?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -944,7 +944,7 @@ impl AdminRpcHandler { self.garage .block_manager .send_scrub_command(scrub_command) - .await; + .await?; Ok(AdminRpc::Ok("Scrub tranquility updated".into())) } WorkerSetCmd::ResyncWorkerCount { worker_count } => { diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 42221c2a..2a8e6298 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,15 +15,15 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) { +pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync(); - garage.object_table.syncer.add_full_sync(); - garage.version_table.syncer.add_full_sync(); - garage.block_ref_table.syncer.add_full_sync(); - garage.key_table.syncer.add_full_sync(); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; } RepairWhat::Versions => { info!("Repairing the versions table"); @@ -56,9 +56,10 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) { } }; info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await; + garage.block_manager.send_scrub_command(cmd).await?; } } + Ok(()) } // ---- diff --git a/src/garage/server.rs b/src/garage/server.rs index d4099a97..8e29f6ec 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -42,6 +42,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing Garage main data store..."); let garage = Garage::new(config.clone(), background)?; + info!("Spawning Garage workers..."); + garage.spawn_workers(); + if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); diff --git a/src/model/garage.rs b/src/model/garage.rs index e34d034f..9ae6af82 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -273,6 +273,22 @@ impl Garage { })) } + pub fn spawn_workers(&self) { + self.block_manager.spawn_workers(); + + self.bucket_table.spawn_workers(); + self.bucket_alias_table.spawn_workers(); + self.key_table.spawn_workers(); + + self.object_table.spawn_workers(); + self.object_counter_table.spawn_workers(); + self.version_table.spawn_workers(); + self.block_ref_table.spawn_workers(); + + #[cfg(feature = "k2v")] + self.k2v.spawn_workers(); + } + pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } @@ -307,4 +323,9 @@ impl GarageK2V { rpc, } } + + pub fn spawn_workers(&self) { + self.item_table.spawn_workers(); + self.counter_table.spawn_workers(); + } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9c8e00c2..d907e947 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -164,6 +164,10 @@ impl IndexCounter { }) } + pub fn spawn_workers(&self) { + self.table.spawn_workers(); + } + pub fn count( &self, tx: &mut db::Transaction, diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 38c6b41c..a8d9b5e6 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" +arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" diff --git a/src/table/gc.rs b/src/table/gc.rs index cfdc9d2d..c83c2050 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -54,24 +54,27 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(system: Arc, data: Arc>) -> Arc { + pub(crate) fn new(system: Arc, data: Arc>) -> Arc { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc) { + self.system + .background + .spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result, Error> { let now = now_msec(); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index bcf9f9d7..0fe7d2cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -70,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc>) -> Arc { + pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result { diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..7008a383 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer data: Arc>, merkle: Arc>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption>, endpoint: Arc>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc, data: Arc>, merkle: Arc>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + self.system.background.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- diff --git a/src/table/table.rs b/src/table/table.rs index c8e0576e..cb200ef2 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -36,6 +36,7 @@ pub struct Table { pub data: Arc>, pub merkle_updater: Arc>, pub syncer: Arc>, + gc: Arc>, endpoint: Arc, Self>>, } @@ -76,29 +77,34 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); - table - .system - .background - .spawn_worker(InsertQueueWorker(table.clone())); - table.endpoint.set_handler(table.clone()); table } + pub fn spawn_workers(self: &Arc) { + self.merkle_updater.spawn_workers(&self.system.background); + self.syncer.spawn_workers(); + self.gc.spawn_workers(); + self.system + .background + .spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/block/manager.rs | 9 +++--- src/garage/admin.rs | 15 ++++++--- src/garage/repair/offline.rs | 17 ++--------- src/garage/repair/online.rs | 22 ++++++-------- src/garage/server.rs | 12 ++++---- src/model/garage.rs | 35 +++++++++------------ src/model/index_counter.rs | 5 +-- src/model/s3/object_table.rs | 2 -- src/model/s3/version_table.rs | 2 -- src/rpc/rpc_helper.rs | 18 +++-------- src/rpc/system.rs | 24 ++++++--------- src/table/gc.rs | 6 ++-- src/table/sync.rs | 4 +-- src/table/table.rs | 19 +++++------- src/util/background/job_worker.rs | 48 ----------------------------- src/util/background/mod.rs | 64 ++++++++------------------------------- 16 files changed, 89 insertions(+), 213 deletions(-) delete mode 100644 src/util/background/job_worker.rs (limited to 'src') diff --git a/src/block/manager.rs b/src/block/manager.rs index ffb9de9a..1b5a5df0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,6 +23,7 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; @@ -144,19 +145,17 @@ impl BlockManager { block_manager } - pub fn spawn_workers(self: &Arc) { + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { let worker = ResyncWorker::new(index, self.clone()); - self.system.background.spawn_worker(worker); + bg.spawn_worker(worker); } // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - self.system - .background - .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 96d838d5..c669b5e6 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_util::background::BackgroundRunner; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -74,13 +75,18 @@ impl Rpc for AdminRpc { pub struct AdminRpcHandler { garage: Arc, + background: Arc, endpoint: Arc>, } impl AdminRpcHandler { - pub fn new(garage: Arc) -> Arc { + pub fn new(garage: Arc, background: Arc) -> Arc { let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { garage, endpoint }); + let admin = Arc::new(Self { + garage, + background, + endpoint, + }); admin.endpoint.set_handler(admin.clone()); admin } @@ -759,7 +765,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt).await?; + launch_online_repair(&self.garage, &self.background, opt).await?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -925,12 +931,11 @@ impl AdminRpcHandler { async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { match cmd { WorkerOperation::List { opt } => { - let workers = self.garage.background.get_worker_info(); + let workers = self.background.get_worker_info(); Ok(AdminRpc::WorkerList(workers, *opt)) } WorkerOperation::Info { tid } => { let info = self - .garage .background .get_worker_info() .get(tid) diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 7760a8bd..25193e4a 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -1,8 +1,5 @@ use std::path::PathBuf; -use tokio::sync::watch; - -use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; @@ -20,12 +17,8 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu info!("Loading configuration..."); let config = read_config(config_file)?; - info!("Initializing background runner..."); - let (done_tx, done_rx) = watch::channel(false); - let (background, await_background_done) = BackgroundRunner::new(16, done_rx); - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let garage = Garage::new(config)?; info!("Launching repair operation..."); match opt.what { @@ -43,13 +36,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu } } - info!("Repair operation finished, shutting down Garage internals..."); - done_tx.send(true).unwrap(); - drop(garage); - - await_background_done.await?; - - info!("Cleaning up..."); + info!("Repair operation finished, shutting down..."); Ok(()) } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2a8e6298..4b4118a8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,7 +15,11 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result<(), Error> { +pub async fn launch_online_repair( + garage: &Arc, + bg: &BackgroundRunner, + opt: RepairOpt, +) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); @@ -27,23 +31,17 @@ pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) -> Result } RepairWhat::Versions => { info!("Repairing the versions table"); - garage - .background - .spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - garage - .background - .spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); } RepairWhat::Scrub { cmd } => { let cmd = match cmd { diff --git a/src/garage/server.rs b/src/garage/server.rs index 8e29f6ec..16f1b625 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -35,15 +35,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { #[cfg(feature = "metrics")] let metrics_exporter = opentelemetry_prometheus::exporter().init(); + info!("Initializing Garage main data store..."); + let garage = Garage::new(config.clone())?; + info!("Initializing background runner..."); let watch_cancel = watch_shutdown_signal(); - let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); - - info!("Initializing Garage main data store..."); - let garage = Garage::new(config.clone(), background)?; + let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone()); info!("Spawning Garage workers..."); - garage.spawn_workers(); + garage.spawn_workers(&background); if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); @@ -66,7 +66,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone()); + AdminRpcHandler::new(garage.clone(), background.clone()); // ---- Launch public-facing API servers ---- diff --git a/src/model/garage.rs b/src/model/garage.rs index 9ae6af82..5bea6b4f 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -39,8 +39,6 @@ pub struct Garage { /// The local database pub db: db::Db, - /// A background job runner - pub background: Arc, /// The membership manager pub system: Arc, /// The block manager @@ -78,7 +76,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, background: Arc) -> Result, Error> { + pub fn new(config: Config) -> Result, Error> { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; @@ -167,7 +165,7 @@ impl Garage { .expect("Invalid replication_mode in config file."); info!("Initialize membership management system..."); - let system = System::new(network_key, background.clone(), replication_mode, &config)?; + let system = System::new(network_key, replication_mode, &config)?; let data_rep_param = TableShardedReplication { system: system.clone(), @@ -225,7 +223,6 @@ impl Garage { info!("Initialize version_table..."); let version_table = Table::new( VersionTable { - background: background.clone(), block_ref_table: block_ref_table.clone(), }, meta_rep_param.clone(), @@ -240,7 +237,6 @@ impl Garage { #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { - background: background.clone(), version_table: version_table.clone(), object_counter_table: object_counter_table.clone(), }, @@ -258,7 +254,6 @@ impl Garage { config, replication_mode, db, - background, system, block_manager, bucket_table, @@ -273,20 +268,20 @@ impl Garage { })) } - pub fn spawn_workers(&self) { - self.block_manager.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.block_manager.spawn_workers(bg); - self.bucket_table.spawn_workers(); - self.bucket_alias_table.spawn_workers(); - self.key_table.spawn_workers(); + self.bucket_table.spawn_workers(bg); + self.bucket_alias_table.spawn_workers(bg); + self.key_table.spawn_workers(bg); - self.object_table.spawn_workers(); - self.object_counter_table.spawn_workers(); - self.version_table.spawn_workers(); - self.block_ref_table.spawn_workers(); + self.object_table.spawn_workers(bg); + self.object_counter_table.spawn_workers(bg); + self.version_table.spawn_workers(bg); + self.block_ref_table.spawn_workers(bg); #[cfg(feature = "k2v")] - self.k2v.spawn_workers(); + self.k2v.spawn_workers(bg); } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { @@ -324,8 +319,8 @@ impl GarageK2V { } } - pub fn spawn_workers(&self) { - self.item_table.spawn_workers(); - self.counter_table.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.item_table.spawn_workers(bg); + self.counter_table.spawn_workers(bg); } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index d907e947..6303ea3e 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -9,6 +9,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -164,8 +165,8 @@ impl IndexCounter { }) } - pub fn spawn_workers(&self) { - self.table.spawn_workers(); + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.table.spawn_workers(bg); } pub fn count( diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 05b27fb4..1b2f0014 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +220,6 @@ impl Crdt for Object { } pub struct ObjectTable { - pub background: Arc, pub version_table: Arc>, pub object_counter_table: Arc>, } diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 0cfaa954..0486512b 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -127,7 +126,6 @@ impl Crdt for Version { } pub struct VersionTable { - pub background: Arc, pub block_ref_table: Arc>, } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 949aced6..1ec250c3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -5,7 +5,6 @@ use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use futures_util::future::FutureExt; use tokio::select; use tokio::sync::watch; @@ -24,7 +23,6 @@ pub use netapp::message::{ use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -94,7 +92,6 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - background: Arc, ring: watch::Receiver>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -104,7 +101,6 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - background: Arc, ring: watch::Receiver>, rpc_timeout: Option, ) -> Self { @@ -113,7 +109,6 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - background, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -377,16 +372,13 @@ impl RpcHelper { if !resp_stream.is_empty() { // Continue remaining requests in background. - // Continue the remaining requests immediately using tokio::spawn - // but enqueue a task in the background runner - // to ensure that the process won't exit until the requests are done - // (if we had just enqueued the resp_stream.collect directly in the background runner, - // the requests might have been put on hold in the background runner's queue, - // in which case they might timeout or otherwise fail) - let wait_finished_fut = tokio::spawn(async move { + // Note: these requests can get interrupted on process shutdown, + // we must not count on them being executed for certain. + // For all background things that have to happen with certainty, + // they have to be put in a proper queue that is persisted to disk. + tokio::spawn(async move { resp_stream.collect::>>().await; }); - self.0.background.spawn(wait_finished_fut.map(|_| Ok(()))); } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6f14fd..e14adf2a 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,7 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::BackgroundRunner; +use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -110,9 +110,6 @@ pub struct System { pub ring: watch::Receiver>, update_ring: Mutex>>, - /// The job runner of this node - pub background: Arc, - /// Path to metadata directory pub metadata_dir: PathBuf, } @@ -232,7 +229,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - background: Arc, replication_mode: ReplicationMode, config: &Config, ) -> Result, Error> { @@ -354,7 +350,6 @@ impl System { rpc: RpcHelper::new( netapp.id.into(), fullmesh, - background.clone(), ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -372,7 +367,6 @@ impl System { ring, update_ring: Mutex::new(update_ring), - background, metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); @@ -578,7 +572,7 @@ impl System { } /// Save network configuration to disc - async fn save_cluster_layout(self: Arc) -> Result<(), Error> { + async fn save_cluster_layout(&self) -> Result<(), Error> { let ring: Arc = self.ring.borrow().clone(); self.persist_cluster_layout .save_async(&ring.layout) @@ -631,7 +625,7 @@ impl System { || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2.pull_cluster_layout(from).await; Ok(()) }); @@ -676,7 +670,7 @@ impl System { drop(update_ring); let self2 = self.clone(); - self.background.spawn_cancellable(async move { + background::spawn(async move { self2 .rpc .broadcast( @@ -687,7 +681,8 @@ impl System { .await?; Ok(()) }); - self.background.spawn(self.clone().save_cluster_layout()); + + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) @@ -773,7 +768,7 @@ impl System { } for (node_id, node_addr) in ping_list { - tokio::spawn( + background::spawn( self.netapp .clone() .try_connect(node_addr, node_id) @@ -787,11 +782,10 @@ impl System { } #[cfg(feature = "consul-discovery")] - self.background.spawn(self.clone().advertise_to_consul()); + background::spawn(self.clone().advertise_to_consul()); #[cfg(feature = "kubernetes-discovery")] - self.background - .spawn(self.clone().advertise_to_kubernetes()); + background::spawn(self.clone().advertise_to_kubernetes()); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { diff --git a/src/table/gc.rs b/src/table/gc.rs index c83c2050..1fc16364 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -69,10 +69,8 @@ where gc } - pub(crate) fn spawn_workers(self: &Arc) { - self.system - .background - .spawn_worker(GcWorker::new(self.clone())); + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); } async fn gc_loop_iter(&self) -> Result, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc) { + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), diff --git a/src/table/table.rs b/src/table/table.rs index cb200ef2..4d93102e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::{self, BackgroundRunner}; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -96,13 +97,11 @@ where table } - pub fn spawn_workers(self: &Arc) { - self.merkle_updater.spawn_workers(&self.system.background); - self.syncer.spawn_workers(); - self.gc.spawn_workers(); - self.system - .background - .spawn_worker(InsertQueueWorker(self.clone())); + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { @@ -276,9 +275,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } @@ -375,7 +372,7 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - self.system.background.spawn_cancellable(async move { + background::spawn(async move { for v in to_repair { self2.repair_on_read(&who[..], v).await?; } diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs deleted file mode 100644 index 2568ea11..00000000 --- a/src/util/background/job_worker.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Job worker: a generic worker that just processes incoming -//! jobs one by one - -use std::sync::Arc; - -use async_trait::async_trait; -use tokio::sync::{mpsc, Mutex}; - -use crate::background::worker::*; -use crate::background::*; - -pub(crate) struct JobWorker { - pub(crate) index: usize, - pub(crate) job_chan: Arc>>, - pub(crate) next_job: Option, -} - -#[async_trait] -impl Worker for JobWorker { - fn name(&self) -> String { - format!("Job worker #{}", self.index) - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - match self.next_job.take() { - None => return Ok(WorkerState::Idle), - Some(job) => { - job.await?; - Ok(WorkerState::Busy) - } - } - } - - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - loop { - match self.job_chan.lock().await.recv().await { - Some((job, cancellable)) => { - if cancellable && *must_exit.borrow() { - continue; - } - self.next_job = Some(job); - return WorkerState::Busy; - } - None => return WorkerState::Done, - } - } - } -} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..0bb4fb67 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,23 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; use core::future::Future; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin + Send>>; /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender>, worker_info: Arc>>, } @@ -49,10 +45,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver, - ) -> (Arc, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver) -> (Arc, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +56,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +67,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable(&self, job: T) - where - T: Future + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker(&self, worker: W) where W: Worker + 'static, @@ -126,3 +77,14 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } + +pub fn spawn(job: T) +where + T: Future + Send + 'static, +{ + tokio::spawn(async move { + if let Err(e) = job.await { + error!("{}", e); + } + }); +} -- cgit v1.2.3 From a19bfef508bdcd18773666ab77f85f5a2a9f1388 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:57:33 +0100 Subject: Improve error message on rpc connection failure --- src/rpc/system.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index e14adf2a..3b321a7d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -50,8 +50,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; -pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret"; - /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] pub enum SystemRpc { @@ -438,17 +436,17 @@ impl System { )) })?; let mut errors = vec![]; - for ip in addrs.iter() { + for addr in addrs.iter() { match self .netapp .clone() - .try_connect(*ip, pubkey) + .try_connect(*addr, pubkey) .await - .err_context(CONNECT_ERROR_MESSAGE) + .err_context(connect_error_message(*addr, pubkey)) { Ok(()) => return Ok(()), Err(e) => { - errors.push((*ip, e)); + errors.push((*addr, e)); } } } @@ -772,7 +770,7 @@ impl System { self.netapp .clone() .try_connect(node_addr, node_id) - .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)), + .map(move |r| r.err_context(connect_error_message(node_addr, node_id))), ); } } @@ -875,3 +873,7 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } + +fn connect_error_message(addr: SocketAddr, pubkey: ed25519::PublicKey) -> String { + format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret", hex::encode(pubkey), addr) +} -- cgit v1.2.3 From d4af27f920ce48a60f2073e98b17bdf963241686 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 13:54:21 +0100 Subject: Add missing notify --- src/table/data.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/table/data.rs b/src/table/data.rs index e3b8c93f..bf2bf88b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -338,6 +338,7 @@ where .map_err(db::TxError::Abort)?, }; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) } -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/block/repair.rs | 4 +-- src/block/resync.rs | 2 +- src/garage/repair/online.rs | 4 +-- src/table/gc.rs | 5 +-- src/table/merkle.rs | 5 +-- src/table/queue.rs | 5 +-- src/table/sync.rs | 5 +-- src/util/background/worker.rs | 73 ++++++++++++++----------------------------- 8 files changed, 32 insertions(+), 71 deletions(-) (limited to 'src') diff --git a/src/block/repair.rs b/src/block/repair.rs index 1878027e..f5515d4e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -148,7 +148,7 @@ impl Worker for RepairWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -341,7 +341,7 @@ impl Worker for ScrubWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), diff --git a/src/block/resync.rs b/src/block/resync.rs index 8231b55d..51bb9846 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -540,7 +540,7 @@ impl Worker for ResyncWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { while self.index >= self.manager.resync.persisted.load().n_workers { self.manager.resync.notify.notified().await } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 4b4118a8..cd7de49b 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -136,7 +136,7 @@ impl Worker for RepairVersionsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -214,7 +214,7 @@ impl Worker for RepairBlockrefsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 1fc16364..90594fba 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -348,10 +348,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 0fe7d2cb..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -340,10 +340,7 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = self.0.data.merkle_todo_notify.notified() => (), diff --git a/src/table/queue.rs b/src/table/queue.rs index 3671ea7d..860f20d3 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -71,10 +71,7 @@ where Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(600)) => (), _ = self.0.data.insert_queue_notify.notified() => (), diff --git a/src/table/sync.rs b/src/table/sync.rs index 1e7618ca..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -593,10 +593,7 @@ impl Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 7e9da7f8..8165e2cb 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use futures::future::*; @@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; +// All workers that haven't exited for this time after an exit signal was recieved +// will be interrupted in the middle of whatever they are doing. +const EXIT_DEADLINE: Duration = Duration::from_secs(8); + #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] pub enum WorkerState { Busy, @@ -50,10 +54,8 @@ pub trait Worker: Send { async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result; /// Wait for work: await for some task to become available. This future can be interrupted in - /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we - /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows - /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState; + /// the middle for any reason, for example if an interrupt signal was recieved. + async fn wait_for_work(&mut self) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -93,11 +95,9 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); - let stop_signal_worker = self.stop_signal.clone(); let mut worker = WorkerHandler { task_id, stop_signal, - stop_signal_worker, worker: new_worker, state: WorkerState::Busy, errors: 0, @@ -153,26 +153,14 @@ impl WorkerProcessor { } // We are exiting, drain everything - let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { - while let Some(mut worker) = workers.next().await { - if worker.state == WorkerState::Done { - info!( - "Worker {} (TID {}) exited", - worker.worker.name(), - worker.task_id - ); - } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); - } else { - workers.push( - async move { - worker.step().await; - worker - } - .boxed(), - ); - } + while let Some(worker) = workers.next().await { + info!( + "Worker {} (TID {}) exited (last state: {:?})", + worker.worker.name(), + worker.task_id, + worker.state + ); } }; @@ -180,7 +168,7 @@ impl WorkerProcessor { _ = drain_everything => { info!("All workers exited peacefully \\o/"); } - _ = tokio::time::sleep(Duration::from_secs(9)) => { + _ = tokio::time::sleep(EXIT_DEADLINE) => { error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } @@ -190,7 +178,6 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver, - stop_signal_worker: watch::Receiver, worker: Box, state: WorkerState, errors: usize, @@ -225,33 +212,19 @@ impl WorkerHandler { }, WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state - if !*self.stop_signal.borrow() { - select! { - _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), - _ = self.stop_signal.changed() => (), + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => { + self.state = WorkerState::Busy; } + _ = self.stop_signal.changed() => (), } - self.state = WorkerState::Busy; } WorkerState::Idle => { - if *self.stop_signal.borrow() { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // stay in Idle state - } - } - } else { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = self.stop_signal.changed() => { - // stay in Idle state - } + select! { + new_st = self.worker.wait_for_work() => { + self.state = new_st; } + _ = self.stop_signal.changed() => (), } } WorkerState::Done => unreachable!(), -- cgit v1.2.3 From 510b62010871e9133a98f625b85f07a7e50f6f23 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:08:05 +0100 Subject: Get rid of background::spawn --- src/rpc/system.rs | 29 +++++++++++++---------------- src/table/table.rs | 15 ++++++++++----- src/util/background/mod.rs | 16 ---------------- 3 files changed, 23 insertions(+), 37 deletions(-) (limited to 'src') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3b321a7d..f03df509 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -622,11 +621,7 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - let self2 = self.clone(); - background::spawn(async move { - self2.pull_cluster_layout(from).await; - Ok(()) - }); + tokio::spawn(self.clone().pull_cluster_layout(from)); } self.node_status @@ -668,16 +663,18 @@ impl System { drop(update_ring); let self2 = self.clone(); - background::spawn(async move { - self2 + tokio::spawn(async move { + if let Err(e) = self2 .rpc .broadcast( &self2.system_endpoint, SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await?; - Ok(()) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } }); self.save_cluster_layout().await?; @@ -766,12 +763,12 @@ impl System { } for (node_id, node_addr) in ping_list { - background::spawn( - self.netapp - .clone() - .try_connect(node_addr, node_id) - .map(move |r| r.err_context(connect_error_message(node_addr, node_id))), - ); + let self2 = self.clone(); + tokio::spawn(async move { + if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { + error!("{}\n{}", connect_error_message(node_addr, node_id), e); + } + }); } } diff --git a/src/table/table.rs b/src/table/table.rs index 4d93102e..bbcd5971 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,7 +14,7 @@ use opentelemetry::{ use garage_db as db; -use garage_util::background::{self, BackgroundRunner}; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -275,7 +275,11 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); + tokio::spawn(async move { + if let Err(e) = self2.repair_on_read(&who[..], ent2).await { + warn!("Error doing repair on read: {}", e); + } + }); } } @@ -372,11 +376,12 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - background::spawn(async move { + tokio::spawn(async move { for v in to_repair { - self2.repair_on_read(&who[..], v).await?; + if let Err(e) = self2.repair_on_read(&who[..], v).await { + warn!("Error doing repair on read: {}", e); + } } - Ok(()) }); } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 0bb4fb67..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -2,20 +2,15 @@ pub mod worker; -use core::future::Future; - use std::collections::HashMap; use std::sync::Arc; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; - /// Job runner for futures and async functions pub struct BackgroundRunner { send_worker: mpsc::UnboundedSender>, @@ -77,14 +72,3 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } - -pub fn spawn(job: T) -where - T: Future + Send + 'static, -{ - tokio::spawn(async move { - if let Err(e) = job.await { - error!("{}", e); - } - }); -} -- cgit v1.2.3 From e6f14ab5cfe985106092afa228258eeb7d5d8905 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:11:19 +0100 Subject: better error message handling --- src/rpc/system.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f03df509..8f753b7f 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -436,16 +436,13 @@ impl System { })?; let mut errors = vec![]; for addr in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*addr, pubkey) - .await - .err_context(connect_error_message(*addr, pubkey)) - { + match self.netapp.clone().try_connect(*addr, pubkey).await { Ok(()) => return Ok(()), Err(e) => { - errors.push((*addr, e)); + errors.push(( + *addr, + Error::Message(connect_error_message(*addr, pubkey, e)), + )); } } } @@ -766,7 +763,7 @@ impl System { let self2 = self.clone(); tokio::spawn(async move { if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { - error!("{}\n{}", connect_error_message(node_addr, node_id), e); + error!("{}", connect_error_message(node_addr, node_id, e)); } }); } @@ -871,6 +868,10 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { ret } -fn connect_error_message(addr: SocketAddr, pubkey: ed25519::PublicKey) -> String { - format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret", hex::encode(pubkey), addr) +fn connect_error_message( + addr: SocketAddr, + pubkey: ed25519::PublicKey, + e: netapp::error::Error, +) -> String { + format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e) } -- cgit v1.2.3 From 13c86621267272af5bc89ec037d097739dae9aaf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:16:55 +0100 Subject: factorize --- src/table/data.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/table/data.rs b/src/table/data.rs index bf2bf88b..40856b02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -330,13 +330,12 @@ where let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)? } - None => rmp_to_vec_all_named(ins) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)?, + None => rmp_to_vec_all_named(ins), }; + let new_entry = new_entry + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; self.insert_queue_notify.notify_one(); -- cgit v1.2.3 From 1fcd0b371ba1fa94cd6efad5aa9a236b2e58c922 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:31:31 +0100 Subject: online repair workers: retry on error --- src/garage/repair/online.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index cd7de49b..6e8ec2d3 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -92,19 +92,14 @@ impl Worker for RepairVersionsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } + let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), None => { info!("repair_versions: finished, done {}", self.counter); return Ok(WorkerState::Done); } }; - self.counter += 1; - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; if !version.deleted.get() { let object = self @@ -133,6 +128,9 @@ impl Worker for RepairVersionsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } @@ -173,19 +171,14 @@ impl Worker for RepairBlockrefsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => { - self.pos = k; - v - } + let (item_bytes, next_pos) = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), None => { info!("repair_block_ref: finished, done {}", self.counter); return Ok(WorkerState::Done); } }; - self.counter += 1; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if !block_ref.deleted.get() { let version = self @@ -211,6 +204,9 @@ impl Worker for RepairBlockrefsWorker { } } + self.counter += 1; + self.pos = next_pos; + Ok(WorkerState::Busy) } -- cgit v1.2.3 From 6b857a9b8cd454721a2a1e69e108794be07d36a2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 2 Jan 2023 13:50:42 +0100 Subject: cargo fmt --- src/garage/repair/online.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 6e8ec2d3..7120972c 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -171,13 +171,14 @@ impl Worker for RepairBlockrefsWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - let (item_bytes, next_pos) = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; + let (item_bytes, next_pos) = + match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!("repair_block_ref: finished, done {}", self.counter); + return Ok(WorkerState::Done); + } + }; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; if !block_ref.deleted.get() { -- cgit v1.2.3