From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 305 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 src/model/index_counter.rs (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs new file mode 100644 index 00000000..123154d4 --- /dev/null +++ b/src/model/index_counter.rs @@ -0,0 +1,305 @@ +use std::collections::{hash_map, BTreeMap, HashMap}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch}; + +use garage_rpc::ring::Ring; +use garage_rpc::system::System; +use garage_util::data::*; +use garage_util::error::*; + +use garage_table::crdt::*; +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { + const NAME: &'static str; + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterEntry { + pub pk: T::P, + pub sk: T::S, + pub values: BTreeMap, +} + +impl Entry for CounterEntry { + fn partition_key(&self) -> &T::P { + &self.pk + } + fn sort_key(&self) -> &T::S { + &self.sk + } + fn is_tombstone(&self) -> bool { + self.values + .iter() + .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) + } +} + +impl CounterEntry { + pub fn filtered_values(&self, ring: &Ring) -> HashMap { + let nodes = &ring.layout.node_id_vec[..]; + self.filtered_values_with_nodes(nodes) + } + + pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap { + let mut ret = HashMap::new(); + for (name, vals) in self.values.iter() { + let new_vals = vals + .node_values + .iter() + .filter(|(n, _)| nodes.contains(n)) + .map(|(_, (_, v))| *v) + .collect::>(); + if !new_vals.is_empty() { + ret.insert( + name.clone(), + new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), + ); + } + } + + ret + } +} + +/// A counter entry in the global table +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterValue { + pub node_values: BTreeMap, +} + +impl Crdt for CounterEntry { + fn merge(&mut self, other: &Self) { + for (name, e2) in other.values.iter() { + if let Some(e) = self.values.get_mut(name) { + e.merge(e2); + } else { + self.values.insert(name.clone(), e2.clone()); + } + } + } +} + +impl Crdt for CounterValue { + fn merge(&mut self, other: &Self) { + for (node, (t2, e2)) in other.node_values.iter() { + if let Some((t, e)) = self.node_values.get_mut(node) { + if t2 > t { + *e = *e2; + } + } else { + self.node_values.insert(*node, (*t2, *e2)); + } + } + } +} + +pub struct CounterTable { + _phantom_t: PhantomData, +} + +impl TableSchema for CounterTable { + const TABLE_NAME: &'static str = T::NAME; + + type P = T::P; + type S = T::S; + type E = CounterEntry; + type Filter = (DeletedFilter, Vec); + + fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { + // nothing for now + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + if filter.0 == DeletedFilter::Any { + return true; + } + + let is_tombstone = entry + .filtered_values_with_nodes(&filter.1[..]) + .iter() + .all(|(_, v)| *v == 0); + filter.0.apply(is_tombstone) + } +} + +// ---- + +pub struct IndexCounter { + this_node: Uuid, + local_counter: sled::Tree, + propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + pub table: Arc, TableShardedReplication>>, +} + +impl IndexCounter { + pub fn new( + system: Arc, + replication: TableShardedReplication, + db: &sled::Db, + ) -> Arc { + let background = system.background.clone(); + + let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); + + let this = Arc::new(Self { + this_node: system.id, + local_counter: db + .open_tree(format!("local_counter:{}", T::NAME)) + .expect("Unable to open local counter tree"), + propagate_tx, + table: Table::new( + CounterTable { + _phantom_t: Default::default(), + }, + replication, + system, + db, + ), + }); + + let this2 = this.clone(); + background.spawn_worker( + format!("{} index counter propagator", T::NAME), + move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), + ); + this + } + + pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + let tree_key = self.table.data.tree_key(pk, sk); + + let new_entry = self.local_counter.transaction(|tx| { + let mut entry = match tx.get(&tree_key[..])? { + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)? + } + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } + + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + tx.insert(&tree_key[..], new_entry_bytes)?; + + Ok(entry) + })?; + + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + error!( + "Could not propagate updated counter values, failed to send to channel: {}", + e + ); + } + + Ok(()) + } + + async fn propagate_loop( + self: Arc, + mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + must_exit: watch::Receiver, + ) { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let mut buf = HashMap::new(); + let mut errors = 0; + + loop { + let (ent, closed) = match propagate_rx.try_recv() { + Ok(ent) => (Some(ent), false), + Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { + match propagate_rx.recv().await { + Some(ent) => (Some(ent), false), + None => (None, true), + } + } + Err(mpsc::error::TryRecvError::Empty) => (None, false), + Err(mpsc::error::TryRecvError::Disconnected) => (None, true), + }; + + if let Some((pk, sk, counters)) = ent { + let tree_key = self.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + match buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + // As long as we can add entries, loop back and add them to batch + // before sending batch to other nodes + continue; + } + + if !buf.is_empty() { + let entries = buf.iter().map(|(_k, v)| v); + if let Err(e) = self.table.insert_many(entries).await { + errors += 1; + if errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + break; + } + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + + buf.clear(); + errors = 0; + } + + if closed || *must_exit.borrow() { + break; + } + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct LocalCounterEntry { + values: BTreeMap, +} + +impl LocalCounterEntry { + fn into_counter_entry( + self, + this_node: Uuid, + pk: T::P, + sk: T::S, + ) -> CounterEntry { + CounterEntry { + pk, + sk, + values: self + .values + .into_iter() + .map(|(name, (ts, v))| { + let mut node_values = BTreeMap::new(); + node_values.insert(this_node, (ts, v)); + (name, CounterValue { node_values }) + }) + .collect(), + } + } +} -- cgit v1.2.3 From b44d3fc796484a50cd6854f20c9b46e5fddedc9d Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 8 Jun 2022 10:01:44 +0200 Subject: Abstract database behind generic interface and implement alternative drivers (#322) - [x] Design interface - [x] Implement Sled backend - [x] Re-implement the SledCountedTree hack ~~on Sled backend~~ on all backends (i.e. over the abstraction) - [x] Convert Garage code to use generic interface - [x] Proof-read converted Garage code - [ ] Test everything well - [x] Implement sqlite backend - [x] Implement LMDB backend - [ ] (Implement Persy backend?) - [ ] (Implement other backends? (like RocksDB, ...)) - [x] Implement backend choice in config file and garage server module - [x] Add CLI for converting between DB formats - Exploit the new interface to put more things in transactions - [x] `.updated()` trigger on Garage tables Fix #284 **Bugs** - [x] When exporting sqlite, trees iterate empty?? - [x] LMDB doesn't work **Known issues for various back-ends** - Sled: - Eats all my RAM and also all my disk space - `.len()` has to traverse the whole table - Is actually quite slow on some operations - And is actually pretty bad code... - Sqlite: - Requires a lock to be taken on all operations. The lock is also taken when iterating on a table with `.iter()`, and the lock isn't released until the iterator is dropped. This means that we must be VERY carefull to not do anything else inside a `.iter()` loop or else we will have a deadlock! Most such cases have been eliminated from the Garage codebase, but there might still be some that remain. If your Garage-over-Sqlite seems to hang/freeze, this is the reason. - (adapter uses a bunch of unsafe code) - Heed (LMDB): - Not suited for 32-bit machines as it has to map the whole DB in memory. - (adpater uses a tiny bit of unsafe code) **My recommendation:** avoid 32-bit machines and use LMDB as much as possible. **Converting databases** is actually quite easy. For example from Sled to LMDB: ```bash cd src/db cargo run --features cli --bin convert -- -i path/to/garage/meta/db -a sled -o path/to/garage/meta/db.lmdb -b lmdb ``` Then, just add this to your `config.toml`: ```toml db_engine = "lmdb" ``` Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/322 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 62 ++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 32 deletions(-) (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..2602d5d9 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -114,10 +116,6 @@ impl TableSchema for CounterTable { type E = CounterEntry; type Filter = (DeletedFilter, Vec); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { - // nothing for now - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; @@ -135,7 +133,7 @@ impl TableSchema for CounterTable { pub struct IndexCounter { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } @@ -144,7 +142,7 @@ impl IndexCounter { pub fn new( system: Arc, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc { let background = system.background.clone(); @@ -174,36 +172,36 @@ impl IndexCounter { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; - - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } - - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - Ok(entry) - })?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e -- cgit v1.2.3 From 77e3fd6db2c9cd3a10889bd071e95ef839cfbefc Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 15 Jun 2022 20:20:28 +0200 Subject: improve internal item counter mechanisms and implement bucket quotas (#326) - [x] Refactoring of internal counting API - [x] Repair procedure for counters (it's an offline procedure!!!) - [x] New counter for objects in buckets - [x] Add quotas to buckets struct - [x] Add CLI to manage bucket quotas - [x] Add admin API to manage bucket quotas - [x] Apply quotas by adding checks on put operations - [x] Proof-read Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/326 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 250 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 205 insertions(+), 45 deletions(-) (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..36e8172b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,30 +13,36 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap, } -impl Entry for CounterEntry { - fn partition_key(&self) -> &T::P { +impl Entry for CounterEntry { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +52,7 @@ impl Entry for CounterEntry { } } -impl CounterEntry { +impl CounterEntry { pub fn filtered_values(&self, ring: &Ring) -> HashMap { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +85,7 @@ pub struct CounterValue { pub node_values: BTreeMap, } -impl Crdt for CounterEntry { +impl Crdt for CounterEntry { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +111,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable { +pub struct CounterTable { _phantom_t: PhantomData, } -impl TableSchema for CounterTable { - const TABLE_NAME: &'static str = T::NAME; +impl TableSchema for CounterTable { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry; type Filter = (DeletedFilter, Vec); @@ -131,14 +138,14 @@ impl TableSchema for CounterTable { // ---- -pub struct IndexCounter { +pub struct IndexCounter { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc, TableShardedReplication>>, } -impl IndexCounter { +impl IndexCounter { pub fn new( system: Arc, replication: TableShardedReplication, @@ -151,7 +158,7 @@ impl IndexCounter { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +173,7 @@ impl IndexCounter { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,24 +182,45 @@ impl IndexCounter { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -213,7 +241,7 @@ impl IndexCounter { async fn propagate_loop( self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, must_exit: watch::Receiver, ) { // This loop batches updates to counters to be sent all at once. @@ -236,7 +264,7 @@ impl IndexCounter { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -255,10 +283,10 @@ impl IndexCounter { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -272,23 +300,155 @@ impl IndexCounter { } } } + + pub fn offline_recount_all( + &self, + counted_table: &Arc>, + ) -> Result<(), Error> + where + TS: TableSchema, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("counting entries... ({})", hex::encode(&batch[0].0)); + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry { + pk: T::CP, + sk: T::CS, values: BTreeMap, } -impl LocalCounterEntry { - fn into_counter_entry( - self, - this_node: Uuid, - pk: T::P, - sk: T::S, - ) -> CounterEntry { +impl LocalCounterEntry { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() -- cgit v1.2.3 From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/model/index_counter.rs | 169 +++++++++++++++++++++++++++------------------ 1 file changed, 101 insertions(+), 68 deletions(-) (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 36e8172b..26833390 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -2,8 +2,8 @@ use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; @@ -11,6 +11,7 @@ use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -171,11 +172,13 @@ impl IndexCounter { ), }); - let this2 = this.clone(); - background.spawn_worker( - format!("{} index counter propagator", T::COUNTER_TABLE_NAME), - move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), - ); + background.spawn_worker(IndexPropagatorWorker { + index_counter: this.clone(), + propagate_rx, + buf: HashMap::new(), + errors: 0, + }); + this } @@ -239,68 +242,6 @@ impl IndexCounter { Ok(()) } - async fn propagate_loop( - self: Arc, - mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, - must_exit: watch::Receiver, - ) { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let mut buf = HashMap::new(); - let mut errors = 0; - - loop { - let (ent, closed) = match propagate_rx.try_recv() { - Ok(ent) => (Some(ent), false), - Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => { - match propagate_rx.recv().await { - Some(ent) => (Some(ent), false), - None => (None, true), - } - } - Err(mpsc::error::TryRecvError::Empty) => (None, false), - Err(mpsc::error::TryRecvError::Disconnected) => (None, true), - }; - - if let Some((pk, sk, counters)) = ent { - let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.this_node); - match buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - // As long as we can add entries, loop back and add them to batch - // before sending batch to other nodes - continue; - } - - if !buf.is_empty() { - let entries = buf.iter().map(|(_k, v)| v); - if let Err(e) = self.table.insert_many(entries).await { - errors += 1; - if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); - break; - } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); - tokio::time::sleep(Duration::from_secs(5)).await; - continue; - } - - buf.clear(); - errors = 0; - } - - if closed || *must_exit.borrow() { - break; - } - } - } - pub fn offline_recount_all( &self, counted_table: &Arc>, @@ -437,6 +378,98 @@ impl IndexCounter { } } +struct IndexPropagatorWorker { + index_counter: Arc>, + propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, + + buf: HashMap, CounterEntry>, + errors: usize, +} + +impl IndexPropagatorWorker { + fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry) { + let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry(self.index_counter.this_node); + match self.buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + } +} + +#[async_trait] +impl Worker for IndexPropagatorWorker { + fn name(&self) -> String { + format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + } + + fn info(&self) -> Option { + if !self.buf.is_empty() { + Some(format!("{} items in queue", self.buf.len())) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let closed = loop { + match self.propagate_rx.try_recv() { + Ok((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + } + Err(mpsc::error::TryRecvError::Empty) => break false, + Err(mpsc::error::TryRecvError::Disconnected) => break true, + } + }; + + if !self.buf.is_empty() { + let entries_k = self.buf.keys().take(100).cloned().collect::>(); + let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); + if let Err(e) = self.index_counter.table.insert_many(entries).await { + self.errors += 1; + if self.errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); + return Ok(WorkerState::Done); + } + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); + } else { + for k in entries_k { + self.buf.remove(&k); + } + self.errors = 0; + } + + return Ok(WorkerState::Busy); + } else if closed { + return Ok(WorkerState::Done); + } else { + return Ok(WorkerState::Idle); + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + match self.propagate_rx.recv().await { + Some((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + WorkerState::Busy + } + None => match self.buf.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Done, + }, + } + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { pk: T::CP, -- cgit v1.2.3 From 38be811b1cd20d9223b481c0ea91cc7e3ee795dc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:08:00 +0200 Subject: Fix clippy lint that says we should implement Eq --- src/model/index_counter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model/index_counter.rs') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 26833390..e6394f0c 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -81,7 +81,7 @@ impl CounterEntry { } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct CounterValue { pub node_values: BTreeMap, } -- cgit v1.2.3