aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/model/index_counter.rs
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs305
1 files changed, 305 insertions, 0 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
new file mode 100644
index 00000000..123154d4
--- /dev/null
+++ b/src/model/index_counter.rs
@@ -0,0 +1,305 @@
+use std::collections::{hash_map, BTreeMap, HashMap};
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch};
+
+use garage_rpc::ring::Ring;
+use garage_rpc::system::System;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_table::crdt::*;
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
+ const NAME: &'static str;
+ type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+}
+
+/// A counter entry in the global table
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CounterSchema> {
+ pub pk: T::P,
+ pub sk: T::S,
+ pub values: BTreeMap<String, CounterValue>,
+}
+
+impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::P {
+ &self.pk
+ }
+ fn sort_key(&self) -> &T::S {
+ &self.sk
+ }
+ fn is_tombstone(&self) -> bool {
+ self.values
+ .iter()
+ .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0))
+ }
+}
+
+impl<T: CounterSchema> CounterEntry<T> {
+ pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
+ let nodes = &ring.layout.node_id_vec[..];
+ self.filtered_values_with_nodes(nodes)
+ }
+
+ pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
+ let mut ret = HashMap::new();
+ for (name, vals) in self.values.iter() {
+ let new_vals = vals
+ .node_values
+ .iter()
+ .filter(|(n, _)| nodes.contains(n))
+ .map(|(_, (_, v))| *v)
+ .collect::<Vec<_>>();
+ if !new_vals.is_empty() {
+ ret.insert(
+ name.clone(),
+ new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
+ );
+ }
+ }
+
+ ret
+ }
+}
+
+/// A counter entry in the global table
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+}
+
+impl<T: CounterSchema> Crdt for CounterEntry<T> {
+ fn merge(&mut self, other: &Self) {
+ for (name, e2) in other.values.iter() {
+ if let Some(e) = self.values.get_mut(name) {
+ e.merge(e2);
+ } else {
+ self.values.insert(name.clone(), e2.clone());
+ }
+ }
+ }
+}
+
+impl Crdt for CounterValue {
+ fn merge(&mut self, other: &Self) {
+ for (node, (t2, e2)) in other.node_values.iter() {
+ if let Some((t, e)) = self.node_values.get_mut(node) {
+ if t2 > t {
+ *e = *e2;
+ }
+ } else {
+ self.node_values.insert(*node, (*t2, *e2));
+ }
+ }
+ }
+}
+
+pub struct CounterTable<T: CounterSchema> {
+ _phantom_t: PhantomData<T>,
+}
+
+impl<T: CounterSchema> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::NAME;
+
+ type P = T::P;
+ type S = T::S;
+ type E = CounterEntry<T>;
+ type Filter = (DeletedFilter, Vec<Uuid>);
+
+ fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
+ // nothing for now
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ if filter.0 == DeletedFilter::Any {
+ return true;
+ }
+
+ let is_tombstone = entry
+ .filtered_values_with_nodes(&filter.1[..])
+ .iter()
+ .all(|(_, v)| *v == 0);
+ filter.0.apply(is_tombstone)
+ }
+}
+
+// ----
+
+pub struct IndexCounter<T: CounterSchema> {
+ this_node: Uuid,
+ local_counter: sled::Tree,
+ propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
+}
+
+impl<T: CounterSchema> IndexCounter<T> {
+ pub fn new(
+ system: Arc<System>,
+ replication: TableShardedReplication,
+ db: &sled::Db,
+ ) -> Arc<Self> {
+ let background = system.background.clone();
+
+ let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
+
+ let this = Arc::new(Self {
+ this_node: system.id,
+ local_counter: db
+ .open_tree(format!("local_counter:{}", T::NAME))
+ .expect("Unable to open local counter tree"),
+ propagate_tx,
+ table: Table::new(
+ CounterTable {
+ _phantom_t: Default::default(),
+ },
+ replication,
+ system,
+ db,
+ ),
+ });
+
+ let this2 = this.clone();
+ background.spawn_worker(
+ format!("{} index counter propagator", T::NAME),
+ move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
+ );
+ this
+ }
+
+ pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
+ let tree_key = self.table.data.tree_key(pk, sk);
+
+ let new_entry = self.local_counter.transaction(|tx| {
+ let mut entry = match tx.get(&tree_key[..])? {
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?
+ }
+ None => LocalCounterEntry {
+ values: BTreeMap::new(),
+ },
+ };
+
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 += 1;
+ ent.1 += *inc;
+ }
+
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ tx.insert(&tree_key[..], new_entry_bytes)?;
+
+ Ok(entry)
+ })?;
+
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
+ error!(
+ "Could not propagate updated counter values, failed to send to channel: {}",
+ e
+ );
+ }
+
+ Ok(())
+ }
+
+ async fn propagate_loop(
+ self: Arc<Self>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ must_exit: watch::Receiver<bool>,
+ ) {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let mut buf = HashMap::new();
+ let mut errors = 0;
+
+ loop {
+ let (ent, closed) = match propagate_rx.try_recv() {
+ Ok(ent) => (Some(ent), false),
+ Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
+ match propagate_rx.recv().await {
+ Some(ent) => (Some(ent), false),
+ None => (None, true),
+ }
+ }
+ Err(mpsc::error::TryRecvError::Empty) => (None, false),
+ Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
+ };
+
+ if let Some((pk, sk, counters)) = ent {
+ let tree_key = self.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ match buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ // As long as we can add entries, loop back and add them to batch
+ // before sending batch to other nodes
+ continue;
+ }
+
+ if !buf.is_empty() {
+ let entries = buf.iter().map(|(_k, v)| v);
+ if let Err(e) = self.table.insert_many(entries).await {
+ errors += 1;
+ if errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ break;
+ }
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ continue;
+ }
+
+ buf.clear();
+ errors = 0;
+ }
+
+ if closed || *must_exit.borrow() {
+ break;
+ }
+ }
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+struct LocalCounterEntry {
+ values: BTreeMap<String, (u64, i64)>,
+}
+
+impl LocalCounterEntry {
+ fn into_counter_entry<T: CounterSchema>(
+ self,
+ this_node: Uuid,
+ pk: T::P,
+ sk: T::S,
+ ) -> CounterEntry<T> {
+ CounterEntry {
+ pk,
+ sk,
+ values: self
+ .values
+ .into_iter()
+ .map(|(name, (ts, v))| {
+ let mut node_values = BTreeMap::new();
+ node_values.insert(this_node, (ts, v));
+ (name, CounterValue { node_values })
+ })
+ .collect(),
+ }
+ }
+}