aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorQuentin <quentin@deuxfleurs.fr>2020-11-21 18:01:50 +0100
committerQuentin <quentin@deuxfleurs.fr>2020-11-21 18:01:50 +0100
commit28efe341cbb4d96b5f81f5fe756f1a0995461e77 (patch)
tree27664b3c8519df6cea381218d3542f19e0c04126 /src/table
parentb7a377308bbcbb7285a5b11cdcb07361eff93a28 (diff)
parentb3814b15ccc233d7c4233b43816cce20db278f17 (diff)
downloadgarage-28efe341cbb4d96b5f81f5fe756f1a0995461e77.tar.gz
garage-28efe341cbb4d96b5f81f5fe756f1a0995461e77.zip
Merge branch 'master' into feature/website
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/crdt.rs162
-rw-r--r--src/table/lib.rs1
-rw-r--r--src/table/table.rs24
4 files changed, 180 insertions, 8 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 1963f3da..945763fa 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -21,6 +21,7 @@ rand = "0.7"
hex = "0.3"
arc-swap = "0.4"
log = "0.4"
+hexdump = "0.1"
sled = "0.31"
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
new file mode 100644
index 00000000..2b903cf0
--- /dev/null
+++ b/src/table/crdt.rs
@@ -0,0 +1,162 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+pub trait CRDT {
+ fn merge(&mut self, other: &Self);
+}
+
+impl<T> CRDT for T
+where
+ T: Ord + Clone,
+{
+ fn merge(&mut self, other: &Self) {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+}
+
+// ---- LWW Register ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where
+ T: CRDT,
+{
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where
+ T: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
+
+// ---- Boolean (true as absorbing state) ----
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
+
+// ---- LWW Map ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self { vals }
+ }
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index a10f78c2..704f8f1e 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -3,6 +3,7 @@
#[macro_use]
extern crate log;
+pub mod crdt;
pub mod schema;
pub mod util;
diff --git a/src/table/table.rs b/src/table/table.rs
index 2beac3f4..5dfee3c8 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use log::warn;
+
use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
@@ -185,7 +187,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = Self::decode_entry(v_bytes.as_slice())?;
+ let v = self.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -241,7 +243,7 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = Self::decode_entry(entry_bytes.as_slice())?;
+ let entry = self.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
@@ -363,7 +365,7 @@ where
let keep = match filter {
None => true,
Some(f) => {
- let entry = Self::decode_entry(value.as_ref())?;
+ let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
@@ -382,14 +384,14 @@ where
let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
- let update = Self::decode_entry(update_bytes.as_slice())?;
+ let update = self.decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
- let old_entry = Self::decode_entry(&prev_bytes)
+ let old_entry = self.decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
@@ -437,7 +439,7 @@ where
break;
}
if let Some(old_val) = self.store.remove(&key)? {
- let old_entry = Self::decode_entry(&old_val)?;
+ let old_entry = self.decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
@@ -455,12 +457,18 @@ where
ret
}
- fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
+ fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
- None => Err(e.into()),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(e.into())
+ }
},
}
}