diff options
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r-- | src/model/index_counter.rs | 232 |
1 files changed, 64 insertions, 168 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index e6394f0c..35d6596d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,19 +1,18 @@ use core::ops::Bound; -use std::collections::{hash_map, BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch}; use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; -use garage_util::background::*; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_util::time::*; use garage_table::crdt::*; @@ -31,14 +30,44 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { fn counts(&self) -> Vec<(&'static str, i64)>; } -/// A counter entry in the global table -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CountedItem> { - pub pk: T::CP, - pub sk: T::CS, - pub values: BTreeMap<String, CounterValue>, +mod v08 { + use super::CountedItem; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + // ---- Global part (the table everyone queries) ---- + + /// A counter entry in the global table + #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] + pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, + pub values: BTreeMap<String, CounterValue>, + } + + /// A counter entry in the global table + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CounterValue { + pub node_values: BTreeMap<Uuid, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {} + + // ---- Local part (the counter we maintain transactionnaly on each node) ---- + + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] + pub(super) struct LocalCounterEntry<T: CountedItem> { + pub(super) pk: T::CP, + pub(super) sk: T::CS, + pub(super) values: BTreeMap<String, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {} } +pub use v08::*; + impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { fn partition_key(&self) -> &T::CP { &self.pk @@ -80,12 +109,6 @@ impl<T: CountedItem> CounterEntry<T> { } } -/// A counter entry in the global table -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterValue { - pub node_values: BTreeMap<Uuid, (u64, i64)>, -} - impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { @@ -142,7 +165,6 @@ impl<T: CountedItem> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -152,16 +174,11 @@ impl<T: CountedItem> IndexCounter<T> { replication: TableShardedReplication, db: &db::Db, ) -> Arc<Self> { - let background = system.background.clone(); - - let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); - - let this = Arc::new(Self { + Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), - propagate_tx, table: Table::new( CounterTable { _phantom_t: Default::default(), @@ -170,16 +187,11 @@ impl<T: CountedItem> IndexCounter<T> { system, db, ), - }); - - background.spawn_worker(IndexPropagatorWorker { - index_counter: this.clone(), - propagate_rx, - buf: HashMap::new(), - errors: 0, - }); + }) + } - this + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.table.spawn_workers(bg); } pub fn count( @@ -208,11 +220,9 @@ impl<T: CountedItem> IndexCounter<T> { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } + Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry") + .map_err(db::TxError::Abort)?, None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), @@ -227,17 +237,14 @@ impl<T: CountedItem> IndexCounter<T> { ent.1 += *inc; } - let new_entry_bytes = rmp_to_vec_all_named(&entry) + let new_entry_bytes = entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { - error!( - "Could not propagate updated counter values, failed to send to channel: {}", - e - ); - } + let dist_entry = entry.into_counter_entry(self.this_node); + self.table.queue_insert(tx, &dist_entry)?; Ok(()) } @@ -250,23 +257,6 @@ impl<T: CountedItem> IndexCounter<T> { TS: TableSchema<E = T>, TR: TableReplication, { - let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { - let entry_k = self - .table - .data - .tree_key(entry.partition_key(), entry.sort_key()); - self.table - .data - .update_entry_with(&entry_k, |ent| match ent { - Some(mut ent) => { - ent.merge(&entry); - ent - } - None => entry.clone(), - })?; - Ok(()) - }; - // 1. Set all old local counters to zero let now = now_msec(); let mut next_start: Option<Vec<u8>> = None; @@ -289,20 +279,22 @@ impl<T: CountedItem> IndexCounter<T> { info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { - let mut local_counter = - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter) + .ok_or_message("Cannot decode local counter entry")?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 = 0; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(local_counter_k); } @@ -343,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> { let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { - let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( - &old_bytes, - )?; + let ent = LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry")?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent @@ -362,12 +353,14 @@ impl<T: CountedItem> IndexCounter<T> { tv.1 += v; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(counted_entry_k); } @@ -378,104 +371,7 @@ impl<T: CountedItem> IndexCounter<T> { } } -struct IndexPropagatorWorker<T: CountedItem> { - index_counter: Arc<IndexCounter<T>>, - propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, - - buf: HashMap<Vec<u8>, CounterEntry<T>>, - errors: usize, -} - -impl<T: CountedItem> IndexPropagatorWorker<T> { - fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) { - let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.index_counter.this_node); - match self.buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - } -} - -#[async_trait] -impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { - fn name(&self) -> String { - format!("{} index counter propagator", T::COUNTER_TABLE_NAME) - } - - fn info(&self) -> Option<String> { - if !self.buf.is_empty() { - Some(format!("{} items in queue", self.buf.len())) - } else { - None - } - } - - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let closed = loop { - match self.propagate_rx.try_recv() { - Ok((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - } - Err(mpsc::error::TryRecvError::Empty) => break false, - Err(mpsc::error::TryRecvError::Disconnected) => break true, - } - }; - - if !self.buf.is_empty() { - let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>(); - let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); - if let Err(e) = self.index_counter.table.insert_many(entries).await { - self.errors += 1; - if self.errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); - return Ok(WorkerState::Done); - } - // Propagate error up to worker manager, it will log it, increment a counter, - // and sleep for a certain delay (with exponential backoff), waiting for - // things to go back to normal - return Err(e); - } else { - for k in entries_k { - self.buf.remove(&k); - } - self.errors = 0; - } - - return Ok(WorkerState::Busy); - } else if closed { - return Ok(WorkerState::Done); - } else { - return Ok(WorkerState::Idle); - } - } - - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { - match self.propagate_rx.recv().await { - Some((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - WorkerState::Busy - } - None => match self.buf.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Done, - }, - } - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry<T: CountedItem> { - pk: T::CP, - sk: T::CS, - values: BTreeMap<String, (u64, i64)>, -} +// ---- impl<T: CountedItem> LocalCounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { |