aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs250
1 files changed, 205 insertions, 45 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2602d5d9..36e8172b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,3 +1,4 @@
+use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -12,30 +13,36 @@ use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_table::crdt::*;
-use garage_table::replication::TableShardedReplication;
+use garage_table::replication::*;
use garage_table::*;
-pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
- const NAME: &'static str;
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CounterSchema> {
- pub pk: T::P,
- pub sk: T::S,
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
pub values: BTreeMap<String, CounterValue>,
}
-impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
- fn partition_key(&self) -> &T::P {
+impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::CP {
&self.pk
}
- fn sort_key(&self) -> &T::S {
+ fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
}
}
-impl<T: CounterSchema> CounterEntry<T> {
+impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@@ -78,7 +85,7 @@ pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
-impl<T: CounterSchema> Crdt for CounterEntry<T> {
+impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@@ -104,15 +111,15 @@ impl Crdt for CounterValue {
}
}
-pub struct CounterTable<T: CounterSchema> {
+pub struct CounterTable<T: CountedItem> {
_phantom_t: PhantomData<T>,
}
-impl<T: CounterSchema> TableSchema for CounterTable<T> {
- const TABLE_NAME: &'static str = T::NAME;
+impl<T: CountedItem> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
- type P = T::P;
- type S = T::S;
+ type P = T::CP;
+ type S = T::CS;
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
@@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
// ----
-pub struct IndexCounter<T: CounterSchema> {
+pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
-impl<T: CounterSchema> IndexCounter<T> {
+impl<T: CountedItem> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
@@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
- .open_tree(format!("local_counter:{}", T::NAME))
+ .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this2 = this.clone();
background.spawn_worker(
- format!("{} index counter propagator", T::NAME),
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(
&self,
tx: &mut db::Transaction,
- pk: &T::P,
- sk: &T::S,
- counts: &[(&str, i64)],
+ old: Option<&T>,
+ new: Option<&T>,
) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?,
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?
+ }
None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
values: BTreeMap::new(),
},
};
+ let now = now_msec();
for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
+ ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
@@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> {
}
}
}
+
+ pub fn offline_recount_all<TS, TR>(
+ &self,
+ counted_table: &Arc<Table<TS, TR>>,
+ ) -> Result<(), Error>
+ where
+ TS: TableSchema<E = T>,
+ TR: TableReplication,
+ {
+ let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
+ let entry_k = self
+ .table
+ .data
+ .tree_key(entry.partition_key(), entry.sort_key());
+ self.table
+ .data
+ .update_entry_with(&entry_k, |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&entry);
+ ent
+ }
+ None => entry.clone(),
+ })?;
+ Ok(())
+ };
+
+ // 1. Set all old local counters to zero
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
+ for (local_counter_k, local_counter) in batch {
+ let mut local_counter =
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+
+ for (_, tv) in local_counter.values.iter_mut() {
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 = 0;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_k, &local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(local_counter_k);
+ }
+ }
+
+ // 2. Recount all table entries
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in counted_table
+ .data
+ .store
+ .range((low_bound, Bound::Unbounded))?
+ {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("counting entries... ({})", hex::encode(&batch[0].0));
+ for (counted_entry_k, counted_entry) in batch {
+ let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
+
+ let pk = counted_entry.counter_partition_key();
+ let sk = counted_entry.counter_sort_key();
+ let counts = counted_entry.counts();
+
+ let local_counter_key = self.table.data.tree_key(pk, sk);
+ let mut local_counter = match self.local_counter.get(&local_counter_key)? {
+ Some(old_bytes) => {
+ let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
+ &old_bytes,
+ )?;
+ assert!(ent.pk == *pk);
+ assert!(ent.sk == *sk);
+ ent
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+ for (s, v) in counts.iter() {
+ let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 += v;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_key, local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(counted_entry_k);
+ }
+ }
+
+ // Done
+ Ok(())
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry {
+struct LocalCounterEntry<T: CountedItem> {
+ pk: T::CP,
+ sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
-impl LocalCounterEntry {
- fn into_counter_entry<T: CounterSchema>(
- self,
- this_node: Uuid,
- pk: T::P,
- sk: T::S,
- ) -> CounterEntry<T> {
+impl<T: CountedItem> LocalCounterEntry<T> {
+ fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
- pk,
- sk,
+ pk: self.pk,
+ sk: self.sk,
values: self
.values
.into_iter()