aboutsummaryrefslogblamecommitdiff
path: root/src/model/index_counter.rs
blob: 4fec1138230decbb5d2ddb5d7c7b0740a5807578 (plain) (tree)
1
2
3
4
5
6
7
8
9
10







                                                    

                    











































































































                                                                                                       





                                          
                                  
                      


















                                                                           
                                







                                                                             
                            




























                                                                                               






                                         

                                                                













                                                                                                               
 



                                                                                
 
                                                                                         


































































































                                                                                                                                                                               
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};

use garage_db as db;

use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;

use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;

pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
	const NAME: &'static str;
	type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
	type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
}

/// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterEntry<T: CounterSchema> {
	pub pk: T::P,
	pub sk: T::S,
	pub values: BTreeMap<String, CounterValue>,
}

impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
	fn partition_key(&self) -> &T::P {
		&self.pk
	}
	fn sort_key(&self) -> &T::S {
		&self.sk
	}
	fn is_tombstone(&self) -> bool {
		self.values
			.iter()
			.all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0))
	}
}

impl<T: CounterSchema> CounterEntry<T> {
	pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
		let nodes = &ring.layout.node_id_vec[..];
		self.filtered_values_with_nodes(nodes)
	}

	pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
		let mut ret = HashMap::new();
		for (name, vals) in self.values.iter() {
			let new_vals = vals
				.node_values
				.iter()
				.filter(|(n, _)| nodes.contains(n))
				.map(|(_, (_, v))| *v)
				.collect::<Vec<_>>();
			if !new_vals.is_empty() {
				ret.insert(
					name.clone(),
					new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
				);
			}
		}

		ret
	}
}

/// A counter entry in the global table
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue {
	pub node_values: BTreeMap<Uuid, (u64, i64)>,
}

impl<T: CounterSchema> Crdt for CounterEntry<T> {
	fn merge(&mut self, other: &Self) {
		for (name, e2) in other.values.iter() {
			if let Some(e) = self.values.get_mut(name) {
				e.merge(e2);
			} else {
				self.values.insert(name.clone(), e2.clone());
			}
		}
	}
}

impl Crdt for CounterValue {
	fn merge(&mut self, other: &Self) {
		for (node, (t2, e2)) in other.node_values.iter() {
			if let Some((t, e)) = self.node_values.get_mut(node) {
				if t2 > t {
					*e = *e2;
				}
			} else {
				self.node_values.insert(*node, (*t2, *e2));
			}
		}
	}
}

pub struct CounterTable<T: CounterSchema> {
	_phantom_t: PhantomData<T>,
}

impl<T: CounterSchema> TableSchema for CounterTable<T> {
	const TABLE_NAME: &'static str = T::NAME;

	type P = T::P;
	type S = T::S;
	type E = CounterEntry<T>;
	type Filter = (DeletedFilter, Vec<Uuid>);

	fn updated(
		&self,
		_tx: &mut db::Transaction,
		_old: Option<&Self::E>,
		_new: Option<&Self::E>,
	) -> db::Result<()> {
		// nothing for now
		Ok(())
	}

	fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
		if filter.0 == DeletedFilter::Any {
			return true;
		}

		let is_tombstone = entry
			.filtered_values_with_nodes(&filter.1[..])
			.iter()
			.all(|(_, v)| *v == 0);
		filter.0.apply(is_tombstone)
	}
}

// ----

pub struct IndexCounter<T: CounterSchema> {
	this_node: Uuid,
	local_counter: db::Tree,
	propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
	pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}

impl<T: CounterSchema> IndexCounter<T> {
	pub fn new(
		system: Arc<System>,
		replication: TableShardedReplication,
		db: &db::Db,
	) -> Arc<Self> {
		let background = system.background.clone();

		let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();

		let this = Arc::new(Self {
			this_node: system.id,
			local_counter: db
				.open_tree(format!("local_counter:{}", T::NAME))
				.expect("Unable to open local counter tree"),
			propagate_tx,
			table: Table::new(
				CounterTable {
					_phantom_t: Default::default(),
				},
				replication,
				system,
				db,
			),
		});

		let this2 = this.clone();
		background.spawn_worker(
			format!("{} index counter propagator", T::NAME),
			move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
		);
		this
	}

	pub fn count(
		&self,
		tx: &mut db::Transaction,
		pk: &T::P,
		sk: &T::S,
		counts: &[(&str, i64)],
	) -> db::TxResult<(), Error> {
		let tree_key = self.table.data.tree_key(pk, sk);

		let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
			Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
				.map_err(Error::RmpDecode)
				.map_err(db::TxError::Abort)?,
			None => LocalCounterEntry {
				values: BTreeMap::new(),
			},
		};

		for (s, inc) in counts.iter() {
			let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
			ent.0 += 1;
			ent.1 += *inc;
		}

		let new_entry_bytes = rmp_to_vec_all_named(&entry)
			.map_err(Error::RmpEncode)
			.map_err(db::TxError::Abort)?;
		tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;

		if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
			error!(
				"Could not propagate updated counter values, failed to send to channel: {}",
				e
			);
		}

		Ok(())
	}

	async fn propagate_loop(
		self: Arc<Self>,
		mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
		must_exit: watch::Receiver<bool>,
	) {
		// This loop batches updates to counters to be sent all at once.
		// They are sent once the propagate_rx channel has been emptied (or is closed).
		let mut buf = HashMap::new();
		let mut errors = 0;

		loop {
			let (ent, closed) = match propagate_rx.try_recv() {
				Ok(ent) => (Some(ent), false),
				Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
					match propagate_rx.recv().await {
						Some(ent) => (Some(ent), false),
						None => (None, true),
					}
				}
				Err(mpsc::error::TryRecvError::Empty) => (None, false),
				Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
			};

			if let Some((pk, sk, counters)) = ent {
				let tree_key = self.table.data.tree_key(&pk, &sk);
				let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
				match buf.entry(tree_key) {
					hash_map::Entry::Vacant(e) => {
						e.insert(dist_entry);
					}
					hash_map::Entry::Occupied(mut e) => {
						e.get_mut().merge(&dist_entry);
					}
				}
				// As long as we can add entries, loop back and add them to batch
				// before sending batch to other nodes
				continue;
			}

			if !buf.is_empty() {
				let entries = buf.iter().map(|(_k, v)| v);
				if let Err(e) = self.table.insert_many(entries).await {
					errors += 1;
					if errors >= 2 && *must_exit.borrow() {
						error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
						break;
					}
					warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
					tokio::time::sleep(Duration::from_secs(5)).await;
					continue;
				}

				buf.clear();
				errors = 0;
			}

			if closed || *must_exit.borrow() {
				break;
			}
		}
	}
}

#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry {
	values: BTreeMap<String, (u64, i64)>,
}

impl LocalCounterEntry {
	fn into_counter_entry<T: CounterSchema>(
		self,
		this_node: Uuid,
		pk: T::P,
		sk: T::S,
	) -> CounterEntry<T> {
		CounterEntry {
			pk,
			sk,
			values: self
				.values
				.into_iter()
				.map(|(name, (ts, v))| {
					let mut node_values = BTreeMap::new();
					node_values.insert(this_node, (ts, v));
					(name, CounterValue { node_values })
				})
				.collect(),
		}
	}
}