aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/causality.rs
blob: c80ebd39bce9eda61e1a0475345983e3ee3afa18 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//! Implements a CausalContext, which is a set of timestamps for each
//! node -- a vector clock --, indicating that the versions with
//! timestamps <= these numbers have been seen and can be
//! overwritten by a subsequent write.
//!
//! The textual representation of a CausalContext, which we call a
//! "causality token", is used in the API and must be sent along with
//! each write or delete operation to indicate the previously seen
//! versions that we want to overwrite or delete.
use base64::prelude::*;

use std::collections::BTreeMap;
use std::convert::TryInto;

use serde::{Deserialize, Serialize};

use garage_util::data::*;

use crate::helper::error::{Error as HelperError, OkOrBadRequest};

/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;

pub type VectorClock = BTreeMap<K2VNodeId, u64>;

pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
	let mut tmp = [0u8; 8];
	tmp.copy_from_slice(&node_id.as_slice()[..8]);
	u64::from_be_bytes(tmp)
}

pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
	a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
}

pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
	let mut ret = a.clone();
	for (n, ts) in b.iter() {
		let ent = ret.entry(*n).or_insert(0);
		*ent = std::cmp::max(*ts, *ent);
	}
	ret
}

#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
	pub vector_clock: VectorClock,
}

impl CausalContext {
	/// Empty causality context
	pub fn new() -> Self {
		Self::default()
	}

	/// Make binary representation and encode in base64
	pub fn serialize(&self) -> String {
		let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
		for (node, time) in self.vector_clock.iter() {
			ints.push(*node);
			ints.push(*time);
		}
		let checksum = ints.iter().fold(0, |acc, v| acc ^ *v);

		let mut bytes = u64::to_be_bytes(checksum).to_vec();
		for i in ints {
			bytes.extend(u64::to_be_bytes(i));
		}

		BASE64_URL_SAFE_NO_PAD.encode(bytes)
	}

	/// Parse from base64-encoded binary representation.
	/// Returns None on error.
	pub fn parse(s: &str) -> Option<Self> {
		let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?;
		if bytes.len() % 16 != 8 || bytes.len() < 8 {
			return None;
		}

		let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
		let mut ret = CausalContext {
			vector_clock: BTreeMap::new(),
		};

		for i in 0..(bytes.len() / 16) {
			let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap());
			let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap());
			ret.vector_clock.insert(node_id, time);
		}

		let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);

		if check != checksum {
			return None;
		}

		Some(ret)
	}

	pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
		Self::parse(s).ok_or_bad_request("Invalid causality token")
	}

	/// Check if this causal context contains newer items than another one
	pub fn is_newer_than(&self, other: &Self) -> bool {
		vclock_gt(&self.vector_clock, &other.vector_clock)
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_causality_token_serialization() {
		let ct = CausalContext {
			vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)]
				.iter()
				.cloned()
				.collect(),
		};

		assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct);
	}
}