aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-10 16:21:56 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-10 16:21:56 +0100
commitf319a7d3740ba8b83c9c0eae27edfda1c1d14c03 (patch)
treeefde4606ad33dcf5ad357f82553ad3b07d4a9858
parent6a3dcf39740cda27e61b93582b6fea66991ec4f2 (diff)
downloadgarage-f319a7d3740ba8b83c9c0eae27edfda1c1d14c03.tar.gz
garage-f319a7d3740ba8b83c9c0eae27edfda1c1d14c03.zip
Refactor model stuff, including cleaner CRDTs
-rw-r--r--src/api/s3_copy.rs13
-rw-r--r--src/api/s3_get.rs12
-rw-r--r--src/api/s3_put.rs37
-rw-r--r--src/garage/repair.rs9
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/block_ref_table.rs17
-rw-r--r--src/model/bucket_table.rs2
-rw-r--r--src/model/garage.rs1
-rw-r--r--src/model/key_table.rs13
-rw-r--r--src/model/object_table.rs39
-rw-r--r--src/model/version_table.rs96
-rw-r--r--src/table/crdt.rs327
-rw-r--r--src/table/crdt/bool.rs34
-rw-r--r--src/table/crdt/crdt.rs73
-rw-r--r--src/table/crdt/lww.rs114
-rw-r--r--src/table/crdt/lww_map.rs145
-rw-r--r--src/table/crdt/map.rs83
-rw-r--r--src/table/crdt/mod.rs22
-rw-r--r--src/table/schema.rs6
-rw-r--r--src/table/table.rs1
20 files changed, 590 insertions, 456 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index b6ec48b0..c6c30095 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -66,25 +66,28 @@ pub async fn handle_copy(
.await?;
let source_version = source_version.ok_or(Error::NotFound)?;
- let dest_version = Version::new(
+ let mut dest_version = Version::new(
new_uuid,
dest_bucket.to_string(),
dest_key.to_string(),
false,
- source_version.blocks().to_vec(),
);
+ for (bk, bv) in source_version.blocks.items().iter() {
+ dest_version.blocks.put(*bk, *bv);
+ }
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
vec![dest_object_version],
);
let dest_block_refs = dest_version
- .blocks()
+ .blocks
+ .items()
.iter()
.map(|b| BlockRef {
- block: b.hash,
+ block: b.1.hash,
version: new_uuid,
- deleted: false,
+ deleted: false.into(),
})
.collect::<Vec<_>>();
futures::try_join!(
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 68e7c66a..22a55b55 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -146,9 +146,10 @@ pub async fn handle_get(
let version = version.ok_or(Error::NotFound)?;
let mut blocks = version
- .blocks()
+ .blocks
+ .items()
.iter()
- .map(|vb| (vb.hash, None))
+ .map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
blocks[0].1 = Some(first_block);
@@ -219,11 +220,12 @@ pub async fn handle_get_range(
// file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min(
- version.blocks().len(),
- 4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize,
+ version.blocks.len(),
+ 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
+ as usize,
));
let mut true_offset = 0;
- for b in version.blocks().iter() {
+ for (_, b) in version.blocks.items().iter() {
if true_offset >= end {
break;
}
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index ec599a05..37a1ece2 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -94,7 +94,7 @@ pub async fn handle_put(
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
- let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false);
let first_block_hash = sha256sum(&first_block[..]);
// Transfer data and verify checksum
@@ -242,19 +242,18 @@ async fn put_block_meta(
) -> Result<(), GarageError> {
// TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
- version
- .add_block(VersionBlock {
+ version.blocks.put(
+ VersionBlockKey {
part_number,
offset,
- hash,
- size,
- })
- .unwrap();
+ },
+ VersionBlock { hash, size },
+ );
let block_ref = BlockRef {
block: hash,
version: version.uuid,
- deleted: false,
+ deleted: false.into(),
};
futures::try_join!(
@@ -389,7 +388,7 @@ pub async fn handle_put_part(
}
// Copy block to store
- let version = Version::new(version_uuid, bucket, key, false, vec![]);
+ let version = Version::new(version_uuid, bucket, key, false);
let first_block_hash = sha256sum(&first_block[..]);
let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
&garage,
@@ -454,7 +453,7 @@ pub async fn handle_complete_multipart_upload(
};
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
- if version.blocks().len() == 0 {
+ if version.blocks.len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded")));
}
@@ -466,9 +465,10 @@ pub async fn handle_complete_multipart_upload(
// Check that the list of parts they gave us corresponds to the parts we have here
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
let mut parts = version
- .blocks()
+ .blocks
+ .items()
.iter()
- .map(|x| x.part_number)
+ .map(|x| x.0.part_number)
.collect::<Vec<_>>();
parts.dedup();
let same_parts = body_list_of_parts
@@ -485,8 +485,8 @@ pub async fn handle_complete_multipart_upload(
// shouldn't impact compatibility as the S3 docs specify that
// the ETag is an opaque value in case of a multipart upload.
// See also: https://teppen.io/2018/06/23/aws_s3_etags/
- let num_parts = version.blocks().last().unwrap().part_number
- - version.blocks().first().unwrap().part_number
+ let num_parts = version.blocks.items().last().unwrap().0.part_number
+ - version.blocks.items().first().unwrap().0.part_number
+ 1;
let etag = format!(
"{}-{}",
@@ -495,17 +495,18 @@ pub async fn handle_complete_multipart_upload(
);
let total_size = version
- .blocks()
+ .blocks
+ .items()
.iter()
- .map(|x| x.size)
+ .map(|x| x.1.size)
.fold(0, |x, y| x + y);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
size: total_size,
- etag: etag,
+ etag,
},
- version.blocks()[0].hash,
+ version.blocks.items()[0].1.hash,
));
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 297ae9cd..e330f7bb 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -97,7 +97,7 @@ impl Repair {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
- if version.deleted {
+ if version.deleted.get() {
continue;
}
let object = self
@@ -127,7 +127,6 @@ impl Repair {
version.bucket,
version.key,
true,
- vec![],
))
.await?;
}
@@ -146,7 +145,7 @@ impl Repair {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
- if block_ref.deleted {
+ if block_ref.deleted.get() {
continue;
}
let version = self
@@ -155,7 +154,7 @@ impl Repair {
.get(&block_ref.version, &EmptyKey)
.await?;
let ref_exists = match version {
- Some(v) => !v.deleted,
+ Some(v) => !v.deleted.get(),
None => {
warn!(
"Block ref repair: version for block ref {:?} not found, skipping.",
@@ -174,7 +173,7 @@ impl Repair {
.insert(&BlockRef {
block: block_ref.block,
version: block_ref.version,
- deleted: true,
+ deleted: true.into(),
})
.await?;
}
diff --git a/src/model/block.rs b/src/model/block.rs
index 56c85c6a..d3957403 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -420,7 +420,7 @@ impl BlockManager {
if Some(&block_ref.block) == last_hash.as_ref() {
continue;
}
- if !block_ref.deleted {
+ if !block_ref.deleted.get() {
last_hash = Some(block_ref.block);
self.put_to_resync(&block_ref.block, 0)?;
}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index 9ab67737..07fa5144 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -1,9 +1,9 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use garage_util::background::*;
use garage_util::data::*;
+use garage_table::crdt::CRDT;
use garage_table::*;
use crate::block::*;
@@ -17,7 +17,7 @@ pub struct BlockRef {
pub version: UUID,
// Keep track of deleted status
- pub deleted: bool,
+ pub deleted: crdt::Bool,
}
impl Entry<Hash, UUID> for BlockRef {
@@ -27,16 +27,15 @@ impl Entry<Hash, UUID> for BlockRef {
fn sort_key(&self) -> &UUID {
&self.version
}
+}
+impl CRDT for BlockRef {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
- }
+ self.deleted.merge(&other.deleted);
}
}
pub struct BlockRefTable {
- pub background: Arc<BackgroundRunner>,
pub block_manager: Arc<BlockManager>,
}
@@ -48,8 +47,8 @@ impl TableSchema for BlockRefTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
- let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
- let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
+ let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);
@@ -63,6 +62,6 @@ impl TableSchema for BlockRefTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 2878aa38..5bc8b7f9 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -89,7 +89,9 @@ impl Entry<EmptyKey, String> for Bucket {
fn sort_key(&self) -> &String {
&self.name
}
+}
+impl CRDT for Bucket {
fn merge(&mut self, other: &Self) {
self.state.merge(&other.state);
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 467d0aec..d109fdaa 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -79,7 +79,6 @@ impl Garage {
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
- background: background.clone(),
block_manager: block_manager.clone(),
},
data_rep_param.clone(),
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 5942df75..6d8cc6c0 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::CRDT;
+use garage_table::crdt::*;
use garage_table::*;
use model010::key_table as prev;
@@ -66,6 +66,10 @@ pub struct PermissionSet {
pub allow_write: bool,
}
+impl AutoCRDT for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl Entry<EmptyKey, String> for Key {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -73,17 +77,18 @@ impl Entry<EmptyKey, String> for Key {
fn sort_key(&self) -> &String {
&self.key_id
}
+}
+impl CRDT for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.authorized_buckets.clear();
- return;
+ } else {
+ self.authorized_buckets.merge(&other.authorized_buckets);
}
-
- self.authorized_buckets.merge(&other.authorized_buckets);
}
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 16cce72c..75c37f6d 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -5,6 +5,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -70,7 +71,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl ObjectVersionState {
+impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -91,37 +92,30 @@ impl ObjectVersionState {
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash),
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+impl AutoCRDT for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders,
pub size: u64,
pub etag: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
pub content_type: String,
pub other: BTreeMap<String, String>,
}
-impl ObjectVersionData {
- fn merge(&mut self, b: &Self) {
- if *self != *b {
- warn!(
- "Inconsistent object version data: {:?} (local) vs {:?} (remote)",
- self, b
- );
- }
- }
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
@@ -154,8 +148,11 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String {
&self.key
}
+}
+impl CRDT for Object {
fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
@@ -169,6 +166,9 @@ impl Entry<String, String> for Object {
}
}
}
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
let last_complete = self
.versions
.iter()
@@ -212,13 +212,8 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index cf9fbe98..26abb64e 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -15,8 +16,8 @@ pub struct Version {
pub uuid: UUID,
// Actual data: the blocks for this version
- pub deleted: bool,
- blocks: Vec<VersionBlock>,
+ pub deleted: crdt::Bool,
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
@@ -25,56 +26,45 @@ pub struct Version {
}
impl Version {
- pub fn new(
- uuid: UUID,
- bucket: String,
- key: String,
- deleted: bool,
- blocks: Vec<VersionBlock>,
- ) -> Self {
- let mut ret = Self {
+ pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
+ Self {
uuid,
- deleted,
- blocks: vec![],
+ deleted: deleted.into(),
+ blocks: crdt::Map::new(),
bucket,
key,
- };
- for b in blocks {
- ret.add_block(b)
- .expect("Twice the same VersionBlock in Version constructor");
}
- ret
}
- /// Adds a block if it wasn't already present
- pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
- match self
- .blocks
- .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key()))
- {
- Err(i) => {
- self.blocks.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ pub part_number: u64,
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
}
- pub fn blocks(&self) -> &[VersionBlock] {
- &self.blocks[..]
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
- pub part_number: u64,
- pub offset: u64,
pub hash: Hash,
pub size: u64,
}
-impl VersionBlock {
- fn cmp_key(&self) -> (u64, u64) {
- (self.part_number, self.offset)
- }
+impl AutoCRDT for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
}
impl Entry<Hash, EmptyKey> for Version {
@@ -84,23 +74,16 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
+}
+impl CRDT for Version {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
self.blocks.clear();
- } else if !self.deleted {
- for bi in other.blocks.iter() {
- match self
- .blocks
- .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key()))
- {
- Ok(_) => (),
- Err(pos) => {
- self.blocks.insert(pos, bi.clone());
- }
- }
- }
+ } else {
+ self.blocks.merge(&other.blocks);
}
}
}
@@ -121,14 +104,15 @@ impl TableSchema for VersionTable {
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- if new_v.deleted && !old_v.deleted {
+ if new_v.deleted.get() && !old_v.deleted.get() {
let deleted_block_refs = old_v
.blocks
+ .items()
.iter()
- .map(|vb| BlockRef {
+ .map(|(_k, vb)| BlockRef {
block: vb.hash,
version: old_v.uuid,
- deleted: true,
+ deleted: true.into(),
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
@@ -139,6 +123,6 @@ impl TableSchema for VersionTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
deleted file mode 100644
index 4cba10ce..00000000
--- a/src/table/crdt.rs
+++ /dev/null
@@ -1,327 +0,0 @@
-//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
-//!
-//! CRDTs are a type of data structures that do not require coordination. In other words, we can
-//! edit them in parallel, we will always find a way to merge it.
-//!
-//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
-//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
-//! it is easy to merge their counters, order does not count: we always get 4.
-//!
-//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
-
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-/// Definition of a CRDT - all CRDT Rust types implement this.
-///
-/// A CRDT is defined as a merge operator that respects a certain set of axioms.
-///
-/// In particular, the merge operator must be commutative, associative,
-/// idempotent, and monotonic.
-/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
-/// the following axioms must apply:
-///
-/// ```text
-/// a ⊔ b = b ⊔ a (commutativity)
-/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
-/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
-/// ```
-///
-/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
-/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
-/// as this would imply a cycle in the partial order.
-pub trait CRDT {
- /// Merge the two datastructures according to the CRDT rules.
- /// `self` is modified to contain the merged CRDT value. `other` is not modified.
- ///
- /// # Arguments
- ///
- /// * `other` - the other CRDT we wish to merge with
- fn merge(&mut self, other: &Self);
-}
-
-/// All types that implement `Ord` (a total order) also implement a trivial CRDT
-/// defined by the merge rule: `a ⊔ b = max(a, b)`.
-impl<T> CRDT for T
-where
- T: Ord + Clone,
-{
- fn merge(&mut self, other: &Self) {
- if other > self {
- *self = other.clone();
- }
- }
-}
-
-// ---- LWW Register ----
-
-/// Last Write Win (LWW)
-///
-/// An LWW CRDT associates a timestamp with a value, in order to implement a
-/// time-based reconciliation rule: the most recent write wins.
-/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
-/// with the same timestamp but different values.
-///
-/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
-/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
-/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
-/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
-/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
-/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
-/// but different inner values, as the rule to keep the maximum value isn't generally the desired
-/// semantics.)
-///
-/// As multiple computers clocks are always desynchronized,
-/// when operations are close enough, it is equivalent to
-/// take one copy and drop the other one.
-///
-/// Given that clocks are not too desynchronized, this assumption
-/// is enough for most cases, as there is few chance that two humans
-/// coordonate themself faster than the time difference between two NTP servers.
-///
-/// As a more concret example, let's suppose you want to upload a file
-/// with the same key (path) in the same bucket at the very same time.
-/// For each request, the file will be timestamped by the receiving server
-/// and may differ from what you observed with your atomic clock!
-///
-/// This scheme is used by AWS S3 or Soundcloud and often without knowing
-/// in entreprise when reconciliating databases with ad-hoc scripts.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T> {
- ts: u64,
- v: T,
-}
-
-impl<T> LWW<T>
-where
- T: CRDT,
-{
- /// Creates a new CRDT
- ///
- /// CRDT's internal timestamp is set with current node's clock.
- pub fn new(value: T) -> Self {
- Self {
- ts: now_msec(),
- v: value,
- }
- }
-
- /// Build a new CRDT from a previous non-compatible one
- ///
- /// Compared to new, the CRDT's timestamp is not set to now
- /// but must be set to the previous, non-compatible, CRDT's timestamp.
- pub fn migrate_from_raw(ts: u64, value: T) -> Self {
- Self { ts, v: value }
- }
-
- /// Update the LWW CRDT while keeping some causal ordering.
- ///
- /// The timestamp of the LWW CRDT is updated to be the current node's clock
- /// at time of update, or the previous timestamp + 1 if that's bigger,
- /// so that the new timestamp is always strictly larger than the previous one.
- /// This ensures that merging the update with the old value will result in keeping
- /// the updated value.
- pub fn update(&mut self, new_value: T) {
- self.ts = std::cmp::max(self.ts + 1, now_msec());
- self.v = new_value;
- }
-
- /// Get the CRDT value
- pub fn get(&self) -> &T {
- &self.v
- }
-
- /// Get a mutable reference to the CRDT's value
- ///
- /// This is usefull to mutate the inside value without changing the LWW timestamp.
- /// When such mutation is done, the merge between two LWW values is done using the inner
- /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
- /// data type, such as a map, and we only want to change a single item in the map.
- /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
- /// This delta consists in a LWW with the same timestamp, and the map
- /// inside only contains the updated value.
- /// The advantage of such a delta is that it is much smaller than the whole map.
- ///
- /// Avoid using this if the inner data type is a primitive type such as a number or a string,
- /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
- /// of both values.
- pub fn get_mut(&mut self) -> &mut T {
- &mut self.v
- }
-}
-
-impl<T> CRDT for LWW<T>
-where
- T: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- if other.ts > self.ts {
- self.ts = other.ts;
- self.v = other.v.clone();
- } else if other.ts == self.ts {
- self.v.merge(&other.v);
- }
- }
-}
-
-/// Boolean, where `true` is an absorbing state
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
-pub struct Bool(bool);
-
-impl Bool {
- /// Create a new boolean with the specified value
- pub fn new(b: bool) -> Self {
- Self(b)
- }
- /// Set the boolean to true
- pub fn set(&mut self) {
- self.0 = true;
- }
- /// Get the boolean value
- pub fn get(&self) -> bool {
- self.0
- }
-}
-
-impl CRDT for Bool {
- fn merge(&mut self, other: &Self) {
- self.0 = self.0 || other.0;
- }
-}
-
-/// Last Write Win Map
-///
-/// This types defines a CRDT for a map from keys to values.
-/// The values have an associated timestamp, such that the last written value
-/// takes precedence over previous ones. As for the simpler `LWW` type, the value
-/// type `V` is also required to implement the CRDT trait.
-/// We do not encourage mutating the values associated with a given key
-/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
-/// method that would allow that.
-///
-/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
-/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
-/// such that two values can be compared for equality based on their hashes). As a consequence,
-/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
-/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
-/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
-/// actually not losing anything here.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V> {
- vals: Vec<(K, u64, V)>,
-}
-
-impl<K, V> LWWMap<K, V>
-where
- K: Ord,
- V: CRDT,
-{
- /// Create a new empty map CRDT
- pub fn new() -> Self {
- Self { vals: vec![] }
- }
- /// Used to migrate from a map defined in an incompatible format. This produces
- /// a map that contains a single item with the specified timestamp (copied from
- /// the incompatible format). Do this as many times as you have items to migrate,
- /// and put them all together using the CRDT merge operator.
- pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
- Self {
- vals: vec![(k, ts, v)],
- }
- }
- /// Returns a map that contains a single mapping from the specified key to the specified value.
- /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
- /// the previous value will be replaced with the one specified here.
- /// The timestamp in the provided mutator is set to the maximum of the current system's clock
- /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
- /// take precedence (LWW rule).
- ///
- /// Typically, to update the value associated to a key in the map, you would do the following:
- ///
- /// ```ignore
- /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
- /// my_crdt.merge(&my_update);
- /// ```
- ///
- /// However extracting the mutator on its own and only sending that on the network is very
- /// interesting as it is much smaller than the whole map.
- pub fn update_mutator(&self, k: K, new_v: V) -> Self {
- let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, old_ts, _) = self.vals[i];
- let new_ts = std::cmp::max(old_ts + 1, now_msec());
- vec![(k, new_ts, new_v)]
- }
- Err(_) => vec![(k, now_msec(), new_v)],
- };
- Self { vals: new_vals }
- }
- /// Takes all of the values of the map and returns them. The current map is reset to the
- /// empty map. This is very usefull to produce in-place a new map that contains only a delta
- /// that modifies a certain value:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a = a.take_and_clear();
- /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- ///
- /// Of course in this simple example we could have written simply
- /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
- /// but in the case where the map is a field in a struct for instance (as is always the case),
- /// this becomes very handy:
- ///
- /// ```ignore
- /// let mut a = get_my_crdt_value();
- /// let old_a_map = a.map_field.take_and_clear();
- /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
- /// put_my_crdt_value(a);
- /// ```
- pub fn take_and_clear(&mut self) -> Self {
- let vals = std::mem::replace(&mut self.vals, vec![]);
- Self { vals }
- }
- /// Removes all values from the map
- pub fn clear(&mut self) {
- self.vals.clear();
- }
- /// Get a reference to the value assigned to a key
- pub fn get(&self, k: &K) -> Option<&V> {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => Some(&self.vals[i].2),
- Err(_) => None,
- }
- }
- /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
- /// In most case you will want to ignore the timestamp (second item of the tuple).
- pub fn items(&self) -> &[(K, u64, V)] {
- &self.vals[..]
- }
-}
-
-impl<K, V> CRDT for LWWMap<K, V>
-where
- K: Clone + Ord,
- V: Clone + CRDT,
-{
- fn merge(&mut self, other: &Self) {
- for (k, ts2, v2) in other.vals.iter() {
- match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
- Ok(i) => {
- let (_, ts1, _v1) = &self.vals[i];
- if ts2 > ts1 {
- self.vals[i].1 = *ts2;
- self.vals[i].2 = v2.clone();
- } else if ts1 == ts2 {
- self.vals[i].2.merge(&v2);
- }
- }
- Err(i) => {
- self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
- }
- }
- }
- }
-}
diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs
new file mode 100644
index 00000000..1989c92e
--- /dev/null
+++ b/src/table/crdt/bool.rs
@@ -0,0 +1,34 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Boolean, where `true` is an absorbing state
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ /// Create a new boolean with the specified value
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ /// Set the boolean to true
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ /// Get the boolean value
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl From<bool> for Bool {
+ fn from(b: bool) -> Bool {
+ Bool::new(b)
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs
new file mode 100644
index 00000000..636b6df6
--- /dev/null
+++ b/src/table/crdt/crdt.rs
@@ -0,0 +1,73 @@
+use garage_util::data::*;
+
+/// Definition of a CRDT - all CRDT Rust types implement this.
+///
+/// A CRDT is defined as a merge operator that respects a certain set of axioms.
+///
+/// In particular, the merge operator must be commutative, associative,
+/// idempotent, and monotonic.
+/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
+/// the following axioms must apply:
+///
+/// ```text
+/// a ⊔ b = b ⊔ a (commutativity)
+/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
+/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
+/// ```
+///
+/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
+/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
+/// as this would imply a cycle in the partial order.
+pub trait CRDT {
+ /// Merge the two datastructures according to the CRDT rules.
+ /// `self` is modified to contain the merged CRDT value. `other` is not modified.
+ ///
+ /// # Arguments
+ ///
+ /// * `other` - the other CRDT we wish to merge with
+ fn merge(&mut self, other: &Self);
+}
+
+/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
+/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
+/// to enable this behavior.
+pub trait AutoCRDT: Ord + Clone + std::fmt::Debug {
+ /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
+ /// different values in your application should never happen. Set this to false
+ /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
+ const WARN_IF_DIFFERENT: bool;
+}
+
+impl<T> CRDT for T
+where
+ T: AutoCRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if Self::WARN_IF_DIFFERENT && self != other {
+ warn!(
+ "Different CRDT values should be the same (logic error!): {:?} vs {:?}",
+ self, other
+ );
+ if other > self {
+ *self = other.clone();
+ }
+ warn!("Making an arbitrary choice: {:?}", self);
+ } else {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+ }
+}
+
+impl AutoCRDT for String {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for bool {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl AutoCRDT for FixedBytes32 {
+ const WARN_IF_DIFFERENT: bool = true;
+}
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
new file mode 100644
index 00000000..9a3ab671
--- /dev/null
+++ b/src/table/crdt/lww.rs
@@ -0,0 +1,114 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win (LWW)
+///
+/// An LWW CRDT associates a timestamp with a value, in order to implement a
+/// time-based reconciliation rule: the most recent write wins.
+/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
+/// with the same timestamp but different values.
+///
+/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
+/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
+/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
+/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
+/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
+/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
+/// but different inner values, as the rule to keep the maximum value isn't generally the desired
+/// semantics.)
+///
+/// As multiple computers clocks are always desynchronized,
+/// when operations are close enough, it is equivalent to
+/// take one copy and drop the other one.
+///
+/// Given that clocks are not too desynchronized, this assumption
+/// is enough for most cases, as there is few chance that two humans
+/// coordonate themself faster than the time difference between two NTP servers.
+///
+/// As a more concret example, let's suppose you want to upload a file
+/// with the same key (path) in the same bucket at the very same time.
+/// For each request, the file will be timestamped by the receiving server
+/// and may differ from what you observed with your atomic clock!
+///
+/// This scheme is used by AWS S3 or Soundcloud and often without knowing
+/// in entreprise when reconciliating databases with ad-hoc scripts.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where
+ T: CRDT,
+{
+ /// Creates a new CRDT
+ ///
+ /// CRDT's internal timestamp is set with current node's clock.
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+
+ /// Build a new CRDT from a previous non-compatible one
+ ///
+ /// Compared to new, the CRDT's timestamp is not set to now
+ /// but must be set to the previous, non-compatible, CRDT's timestamp.
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+
+ /// Update the LWW CRDT while keeping some causal ordering.
+ ///
+ /// The timestamp of the LWW CRDT is updated to be the current node's clock
+ /// at time of update, or the previous timestamp + 1 if that's bigger,
+ /// so that the new timestamp is always strictly larger than the previous one.
+ /// This ensures that merging the update with the old value will result in keeping
+ /// the updated value.
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+
+ /// Get the CRDT value
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+
+ /// Get a mutable reference to the CRDT's value
+ ///
+ /// This is usefull to mutate the inside value without changing the LWW timestamp.
+ /// When such mutation is done, the merge between two LWW values is done using the inner
+ /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
+ /// data type, such as a map, and we only want to change a single item in the map.
+ /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
+ /// This delta consists in a LWW with the same timestamp, and the map
+ /// inside only contains the updated value.
+ /// The advantage of such a delta is that it is much smaller than the whole map.
+ ///
+ /// Avoid using this if the inner data type is a primitive type such as a number or a string,
+ /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
+ /// of both values.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where
+ T: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
new file mode 100644
index 00000000..bd40f368
--- /dev/null
+++ b/src/table/crdt/lww_map.rs
@@ -0,0 +1,145 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::now_msec;
+
+use crate::crdt::crdt::*;
+
+/// Last Write Win Map
+///
+/// This types defines a CRDT for a map from keys to values.
+/// The values have an associated timestamp, such that the last written value
+/// takes precedence over previous ones. As for the simpler `LWW` type, the value
+/// type `V` is also required to implement the CRDT trait.
+/// We do not encourage mutating the values associated with a given key
+/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
+/// method that would allow that.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ /// Used to migrate from a map defined in an incompatible format. This produces
+ /// a map that contains a single item with the specified timestamp (copied from
+ /// the incompatible format). Do this as many times as you have items to migrate,
+ /// and put them all together using the CRDT merge operator.
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
+ /// the previous value will be replaced with the one specified here.
+ /// The timestamp in the provided mutator is set to the maximum of the current system's clock
+ /// and 1 + the previous value's timestamp (if there is one), so that the new value will always
+ /// take precedence (LWW rule).
+ ///
+ /// Typically, to update the value associated to a key in the map, you would do the following:
+ ///
+ /// ```ignore
+ /// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
+ /// my_crdt.merge(&my_update);
+ /// ```
+ ///
+ /// However extracting the mutator on its own and only sending that on the network is very
+ /// interesting as it is much smaller than the whole map.
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ /// Takes all of the values of the map and returns them. The current map is reset to the
+ /// empty map. This is very usefull to produce in-place a new map that contains only a delta
+ /// that modifies a certain value:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a = a.take_and_clear();
+ /// a.merge(&old_a.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ ///
+ /// Of course in this simple example we could have written simply
+ /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
+ /// but in the case where the map is a field in a struct for instance (as is always the case),
+ /// this becomes very handy:
+ ///
+ /// ```ignore
+ /// let mut a = get_my_crdt_value();
+ /// let old_a_map = a.map_field.take_and_clear();
+ /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
+ /// put_my_crdt_value(a);
+ /// ```
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self { vals }
+ }
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ /// In most case you will want to ignore the timestamp (second item of the tuple).
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
new file mode 100644
index 00000000..1193e6db
--- /dev/null
+++ b/src/table/crdt/map.rs
@@ -0,0 +1,83 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Simple CRDT Map
+///
+/// This types defines a CRDT for a map from keys to values. Values are CRDT types which
+/// can have their own updating logic.
+///
+/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
+/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
+/// such that two values can be compared for equality based on their hashes). As a consequence,
+/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps.
+/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
+/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
+/// actually not losing anything here.
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Map<K, V> {
+ vals: Vec<(K, V)>,
+}
+
+impl<K, V> Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ /// Create a new empty map CRDT
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+
+ /// Returns a map that contains a single mapping from the specified key to the specified value.
+ /// This can be used to build a delta-mutator:
+ /// when merged with another map, the value will be added or CRDT-merged if a previous
+ /// value already exists.
+ pub fn put_mutator(k: K, v: V) -> Self {
+ Self { vals: vec![(k, v)] }
+ }
+
+ pub fn put(&mut self, k: K, v: V) {
+ self.merge(&Self::put_mutator(k, v));
+ }
+
+ /// Removes all values from the map
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+
+ /// Get a reference to the value assigned to a key
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].1),
+ Err(_) => None,
+ }
+ }
+ /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
+ pub fn items(&self) -> &[(K, V)] {
+ &self.vals[..]
+ }
+ /// Returns the number of items in the map
+ pub fn len(&self) -> usize {
+ self.vals.len()
+ }
+}
+
+impl<K, V> CRDT for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ self.vals[i].1.merge(&v2);
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs
new file mode 100644
index 00000000..eb75d061
--- /dev/null
+++ b/src/table/crdt/mod.rs
@@ -0,0 +1,22 @@
+//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
+//!
+//! CRDTs are a type of data structures that do not require coordination. In other words, we can
+//! edit them in parallel, we will always find a way to merge it.
+//!
+//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
+//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
+//! it is easy to merge their counters, order does not count: we always get 4.
+//!
+//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
+
+mod bool;
+mod crdt;
+mod lww;
+mod lww_map;
+mod map;
+
+pub use self::bool::*;
+pub use crdt::*;
+pub use lww::*;
+pub use lww_map::*;
+pub use map::*;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index edd04000..5b789a02 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::crdt::CRDT;
+
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
@@ -35,12 +37,10 @@ impl SortKey for Hash {
}
pub trait Entry<P: PartitionKey, S: SortKey>:
- PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
-
- fn merge(&mut self, other: &Self);
}
pub trait TableSchema: Send + Sync {
diff --git a/src/table/table.rs b/src/table/table.rs
index 1f6b7d25..366ce925 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -17,6 +17,7 @@ use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
+use crate::crdt::CRDT;
use crate::schema::*;
use crate::table_sync::*;