diff options
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r-- | src/model/index_counter.rs | 250 |
1 files changed, 205 insertions, 45 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..36e8172b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,3 +1,4 @@ +use core::ops::Bound; use std::collections::{hash_map, BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; @@ -12,30 +13,36 @@ use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_table::crdt::*; -use garage_table::replication::TableShardedReplication; +use garage_table::replication::*; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CounterSchema> { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap<String, CounterValue>, } -impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { - fn partition_key(&self) -> &T::P { +impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { } } -impl<T: CounterSchema> CounterEntry<T> { +impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +85,7 @@ pub struct CounterValue { pub node_values: BTreeMap<Uuid, (u64, i64)>, } -impl<T: CounterSchema> Crdt for CounterEntry<T> { +impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +111,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable<T: CounterSchema> { +pub struct CounterTable<T: CountedItem> { _phantom_t: PhantomData<T>, } -impl<T: CounterSchema> TableSchema for CounterTable<T> { - const TABLE_NAME: &'static str = T::NAME; +impl<T: CountedItem> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); @@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { // ---- -pub struct IndexCounter<T: CounterSchema> { +pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } -impl<T: CounterSchema> IndexCounter<T> { +impl<T: CountedItem> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, @@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)?, + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), values: BTreeMap::new(), }, }; + let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; + ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } @@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> { async fn propagate_loop( self: Arc<Self>, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, must_exit: watch::Receiver<bool>, ) { // This loop batches updates to counters to be sent all at once. @@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> { if let Some((pk, sk, counters)) = ent { let tree_key = self.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk); + let dist_entry = counters.into_counter_entry(self.this_node); match buf.entry(tree_key) { hash_map::Entry::Vacant(e) => { e.insert(dist_entry); @@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> { } } } + + pub fn offline_recount_all<TS, TR>( + &self, + counted_table: &Arc<Table<TS, TR>>, + ) -> Result<(), Error> + where + TS: TableSchema<E = T>, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("counting entries... ({})", hex::encode(&batch[0].0)); + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { +struct LocalCounterEntry<T: CountedItem> { + pk: T::CP, + sk: T::CS, values: BTreeMap<String, (u64, i64)>, } -impl LocalCounterEntry { - fn into_counter_entry<T: CounterSchema>( - self, - this_node: Uuid, - pk: T::P, - sk: T::S, - ) -> CounterEntry<T> { +impl<T: CountedItem> LocalCounterEntry<T> { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { - pk, - sk, + pk: self.pk, + sk: self.sk, values: self .values .into_iter() |