aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs85
1 files changed, 53 insertions, 32 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2602d5d9..109b9828 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -17,25 +17,30 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
- const NAME: &'static str;
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CounterSchema> {
- pub pk: T::P,
- pub sk: T::S,
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
pub values: BTreeMap<String, CounterValue>,
}
-impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
- fn partition_key(&self) -> &T::P {
+impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::CP {
&self.pk
}
- fn sort_key(&self) -> &T::S {
+ fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@@ -45,7 +50,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
}
}
-impl<T: CounterSchema> CounterEntry<T> {
+impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@@ -78,7 +83,7 @@ pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
-impl<T: CounterSchema> Crdt for CounterEntry<T> {
+impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@@ -104,15 +109,15 @@ impl Crdt for CounterValue {
}
}
-pub struct CounterTable<T: CounterSchema> {
+pub struct CounterTable<T: CountedItem> {
_phantom_t: PhantomData<T>,
}
-impl<T: CounterSchema> TableSchema for CounterTable<T> {
- const TABLE_NAME: &'static str = T::NAME;
+impl<T: CountedItem> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
- type P = T::P;
- type S = T::S;
+ type P = T::CP;
+ type S = T::CS;
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
@@ -131,14 +136,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
// ----
-pub struct IndexCounter<T: CounterSchema> {
+pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
-impl<T: CounterSchema> IndexCounter<T> {
+impl<T: CountedItem> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
@@ -151,7 +156,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
- .open_tree(format!("local_counter:{}", T::NAME))
+ .open_tree(format!("local_counter:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@@ -166,7 +171,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this2 = this.clone();
background.spawn_worker(
- format!("{} index counter propagator", T::NAME),
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@@ -175,10 +180,26 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(
&self,
tx: &mut db::Transaction,
- pk: &T::P,
- sk: &T::S,
- counts: &[(&str, i64)],
+ old: Option<&T>,
+ new: Option<&T>,
) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
@@ -213,7 +234,7 @@ impl<T: CounterSchema> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@@ -255,10 +276,10 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@@ -280,11 +301,11 @@ struct LocalCounterEntry {
}
impl LocalCounterEntry {
- fn into_counter_entry<T: CounterSchema>(
+ fn into_counter_entry<T: CountedItem>(
self,
this_node: Uuid,
- pk: T::P,
- sk: T::S,
+ pk: T::CP,
+ sk: T::CS,
) -> CounterEntry<T> {
CounterEntry {
pk,