aboutsummaryrefslogblamecommitdiff
path: root/src/model/k2v/seen.rs
blob: b8f4ff27048b1584e6f16b4cf3750528a79b1269 (plain) (tree)




















































































                                                                                                          
//! Implements a RangeSeenMarker, a data type used in the PollRange API
//! to indicate which items in the range have already been seen
//! and which have not been seen yet.
//!
//! It consists of a vector clock that indicates that for each node,
//! all items produced by that node with timestamps <= the value in the
//! vector clock has been seen, as well as a set of causal contexts for
//! individual items.

use std::collections::BTreeMap;

use serde::{Deserialize, Serialize};

use garage_util::data::Uuid;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};

use crate::k2v::causality::*;
use crate::k2v::item_table::*;

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct RangeSeenMarker {
	vector_clock: VectorClock,
	items: BTreeMap<String, VectorClock>,
}

impl RangeSeenMarker {
	pub fn new() -> Self {
		Self::default()
	}

	pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
		&mut self,
		node: Uuid,
		items: I,
	) {
		let node = make_node_id(node);
		for item in items.into_iter() {
			let cc = item.causal_context();

			if let Some(ts) = cc.vector_clock.get(&node) {
				let ent = self.vector_clock.entry(node).or_insert(0);
				*ent = std::cmp::max(*ent, *ts);
			}

			if vclock_gt(&cc.vector_clock, &self.vector_clock) {
				match self.items.get_mut(&item.sort_key) {
					None => {
						self.items.insert(item.sort_key.clone(), cc.vector_clock);
					}
					Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
				}
			}
		}
	}

	pub fn canonicalize(&mut self) {
		let self_vc = &self.vector_clock;
		self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
	}

	pub fn encode(&mut self) -> Result<String, Error> {
		self.canonicalize();

		let bytes = nonversioned_encode(&self)?;
		let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
		Ok(base64::encode(&bytes))
	}

	pub fn decode(s: &str) -> Result<Self, Error> {
		let bytes = base64::decode(&s).ok_or_message("invalid base64")?;
		let bytes = zstd::stream::decode_all(&mut &bytes[..])?;
		Ok(nonversioned_decode(&bytes)?)
	}

	pub fn is_new_item(&self, item: &K2VItem) -> bool {
		let cc = item.causal_context();
		vclock_gt(&cc.vector_clock, &self.vector_clock)
			&& self
				.items
				.get(&item.sort_key)
				.map(|vc| vclock_gt(&cc.vector_clock, &vc))
				.unwrap_or(true)
	}
}