use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; use garage_table::crdt::*; use garage_table::replication::*; use garage_table::*; pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { const COUNTER_TABLE_NAME: &'static str; type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; fn counter_partition_key(&self) -> &Self::CP; fn counter_sort_key(&self) -> &Self::CS; fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct CounterEntry<T: CountedItem> { pub pk: T::CP, pub sk: T::CS, pub values: BTreeMap<String, CounterValue>, } impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { fn partition_key(&self) -> &T::CP { &self.pk } fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { self.values .iter() .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) } } impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) } pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { let mut ret = HashMap::new(); for (name, vals) in self.values.iter() { let new_vals = vals .node_values .iter() .filter(|(n, _)| nodes.contains(n)) .map(|(_, (_, v))| *v) .collect::<Vec<_>>(); if !new_vals.is_empty() { ret.insert( name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), ); } } ret } } /// A counter entry in the global table #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct CounterValue { pub node_values: BTreeMap<Uuid, (u64, i64)>, } impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { e.merge(e2); } else { self.values.insert(name.clone(), e2.clone()); } } } } impl Crdt for CounterValue { fn merge(&mut self, other: &Self) { for (node, (t2, e2)) in other.node_values.iter() { if let Some((t, e)) = self.node_values.get_mut(node) { if t2 > t { *e = *e2; } } else { self.node_values.insert(*node, (*t2, *e2)); } } } } pub struct CounterTable<T: CountedItem> { _phantom_t: PhantomData<T>, } impl<T: CountedItem> TableSchema for CounterTable<T> { const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; type P = T::CP; type S = T::CS; type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; } let is_tombstone = entry .filtered_values_with_nodes(&filter.1[..]) .iter() .all(|(_, v)| *v == 0); filter.0.apply(is_tombstone) } } // ---- pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } impl<T: CountedItem> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, db: &db::Db, ) -> Arc<Self> { let background = system.background.clone(); let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); let this = Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( CounterTable { _phantom_t: Default::default(), }, replication, system, db, ), }); background.spawn_worker(IndexPropagatorWorker { index_counter: this.clone(), propagate_rx, buf: HashMap::new(), errors: 0, }); this } pub fn count( &self, tx: &mut db::Transaction, old: Option<&T>, new: Option<&T>, ) -> db::TxResult<(), Error> { let pk = old .map(|e| e.counter_partition_key()) .unwrap_or_else(|| new.unwrap().counter_partition_key()); let sk = old .map(|e| e.counter_sort_key()) .unwrap_or_else(|| new.unwrap().counter_sort_key()); // calculate counter differences let mut counts = HashMap::new(); for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { *counts.entry(k).or_insert(0) -= v; } for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { *counts.entry(k).or_insert(0) += v; } // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => { rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) .map_err(Error::RmpDecode) .map_err(db::TxError::Abort)? } None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), values: BTreeMap::new(), }, }; let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } let new_entry_bytes = rmp_to_vec_all_named(&entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e ); } Ok(()) } pub fn offline_recount_all<TS, TR>( &self, counted_table: &Arc<Table<TS, TR>>, ) -> Result<(), Error> where TS: TableSchema<E = T>, TR: TableReplication, { let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { let entry_k = self .table .data .tree_key(entry.partition_key(), entry.sort_key()); self.table .data .update_entry_with(&entry_k, |ent| match ent { Some(mut ent) => { ent.merge(&entry); ent } None => entry.clone(), })?; Ok(()) }; // 1. Set all old local counters to zero let now = now_msec(); let mut next_start: Option<Vec<u8>> = None; loop { let low_bound = match next_start.take() { Some(v) => Bound::Excluded(v), None => Bound::Unbounded, }; let mut batch = vec![]; for item in self.local_counter.range((low_bound, Bound::Unbounded))? { batch.push(item?); if batch.len() > 1000 { break; } } if batch.is_empty() { break; } info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { let mut local_counter = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 = 0; } let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); save_counter_entry(counter_entry)?; next_start = Some(local_counter_k); } } // 2. Recount all table entries let now = now_msec(); let mut next_start: Option<Vec<u8>> = None; loop { let low_bound = match next_start.take() { Some(v) => Bound::Excluded(v), None => Bound::Unbounded, }; let mut batch = vec![]; for item in counted_table .data .store .range((low_bound, Bound::Unbounded))? { batch.push(item?); if batch.len() > 1000 { break; } } if batch.is_empty() { break; } info!("counting entries... ({})", hex::encode(&batch[0].0)); for (counted_entry_k, counted_entry) in batch { let counted_entry = counted_table.data.decode_entry(&counted_entry)?; let pk = counted_entry.counter_partition_key(); let sk = counted_entry.counter_sort_key(); let counts = counted_entry.counts(); let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( &old_bytes, )?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent } None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), values: BTreeMap::new(), }, }; for (s, v) in counts.iter() { let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 += v; } let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); save_counter_entry(counter_entry)?; next_start = Some(counted_entry_k); } } // Done Ok(()) } } struct IndexPropagatorWorker<T: CountedItem> { index_counter: Arc<IndexCounter<T>>, propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, buf: HashMap<Vec<u8>, CounterEntry<T>>, errors: usize, } impl<T: CountedItem> IndexPropagatorWorker<T> { fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) { let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); let dist_entry = counters.into_counter_entry(self.index_counter.this_node); match self.buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); } hash_map::Entry::Occupied(mut e) => { e.get_mut().merge(&dist_entry); } } } } #[async_trait] impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { fn name(&self) -> String { format!("{} counter", T::COUNTER_TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { queue_length: Some(self.buf.len() as u64), ..Default::default() } } async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { // This loop batches updates to counters to be sent all at once. // They are sent once the propagate_rx channel has been emptied (or is closed). let closed = loop { match self.propagate_rx.try_recv() { Ok((pk, sk, counters)) => { self.add_ent(pk, sk, counters); } Err(mpsc::error::TryRecvError::Empty) => break false, Err(mpsc::error::TryRecvError::Disconnected) => break true, } }; if !self.buf.is_empty() { let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>(); let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); if let Err(e) = self.index_counter.table.insert_many(entries).await { self.errors += 1; if self.errors >= 2 && *must_exit.borrow() { error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); return Ok(WorkerState::Done); } // Propagate error up to worker manager, it will log it, increment a counter, // and sleep for a certain delay (with exponential backoff), waiting for // things to go back to normal return Err(e); } else { for k in entries_k { self.buf.remove(&k); } self.errors = 0; } return Ok(WorkerState::Busy); } else if closed { return Ok(WorkerState::Done); } else { return Ok(WorkerState::Idle); } } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { match self.propagate_rx.recv().await { Some((pk, sk, counters)) => { self.add_ent(pk, sk, counters); WorkerState::Busy } None => match self.buf.is_empty() { false => WorkerState::Busy, true => WorkerState::Done, }, } } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry<T: CountedItem> { pk: T::CP, sk: T::CS, values: BTreeMap<String, (u64, i64)>, } impl<T: CountedItem> LocalCounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { pk: self.pk, sk: self.sk, values: self .values .into_iter() .map(|(name, (ts, v))| { let mut node_values = BTreeMap::new(); node_values.insert(this_node, (ts, v)); (name, CounterValue { node_values }) }) .collect(), } } }