diff options
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r-- | src/model/index_counter.rs | 85 |
1 files changed, 53 insertions, 32 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2602d5d9..109b9828 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -17,25 +17,30 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static { - const NAME: &'static str; - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CounterSchema> { - pub pk: T::P, - pub sk: T::S, +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, pub values: BTreeMap<String, CounterValue>, } -impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { - fn partition_key(&self) -> &T::P { +impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { + fn partition_key(&self) -> &T::CP { &self.pk } - fn sort_key(&self) -> &T::S { + fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { @@ -45,7 +50,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> { } } -impl<T: CounterSchema> CounterEntry<T> { +impl<T: CountedItem> CounterEntry<T> { pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) @@ -78,7 +83,7 @@ pub struct CounterValue { pub node_values: BTreeMap<Uuid, (u64, i64)>, } -impl<T: CounterSchema> Crdt for CounterEntry<T> { +impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { @@ -104,15 +109,15 @@ impl Crdt for CounterValue { } } -pub struct CounterTable<T: CounterSchema> { +pub struct CounterTable<T: CountedItem> { _phantom_t: PhantomData<T>, } -impl<T: CounterSchema> TableSchema for CounterTable<T> { - const TABLE_NAME: &'static str = T::NAME; +impl<T: CountedItem> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; - type P = T::P; - type S = T::S; + type P = T::CP; + type S = T::CS; type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); @@ -131,14 +136,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { // ---- -pub struct IndexCounter<T: CounterSchema> { +pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } -impl<T: CounterSchema> IndexCounter<T> { +impl<T: CountedItem> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, @@ -151,7 +156,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this = Arc::new(Self { this_node: system.id, local_counter: db - .open_tree(format!("local_counter:{}", T::NAME)) + .open_tree(format!("local_counter:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), propagate_tx, table: Table::new( @@ -166,7 +171,7 @@ impl<T: CounterSchema> IndexCounter<T> { let this2 = this.clone(); background.spawn_worker( - format!("{} index counter propagator", T::NAME), + format!("{} index counter propagator", T::COUNTER_TABLE_NAME), move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit), ); this @@ -175,10 +180,26 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count( &self, tx: &mut db::Transaction, - pk: &T::P, - sk: &T::S, - counts: &[(&str, i64)], + old: Option<&T>, + new: Option<&T>, ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { @@ -213,7 +234,7 @@ impl<T: CounterSchema> IndexCounter<T> { async fn propagate_loop( self: Arc<Self>, - mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>, + mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>, must_exit: watch::Receiver<bool>, ) { // This loop batches updates to counters to be sent all at once. @@ -255,10 +276,10 @@ impl<T: CounterSchema> IndexCounter<T> { if let Err(e) = self.table.insert_many(entries).await { errors += 1; if errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e); + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e); break; } - warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors); + warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors); tokio::time::sleep(Duration::from_secs(5)).await; continue; } @@ -280,11 +301,11 @@ struct LocalCounterEntry { } impl LocalCounterEntry { - fn into_counter_entry<T: CounterSchema>( + fn into_counter_entry<T: CountedItem>( self, this_node: Uuid, - pk: T::P, - sk: T::S, + pk: T::CP, + sk: T::CS, ) -> CounterEntry<T> { CounterEntry { pk, |