aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/s3_put.rs2
-rw-r--r--src/api/signature.rs2
-rw-r--r--src/garage/admin_rpc.rs128
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/bucket_table.rs138
-rw-r--r--src/model/key_table.rs122
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/crdt.rs162
-rw-r--r--src/table/lib.rs3
-rw-r--r--src/table/schema.rs4
-rw-r--r--src/table/table.rs24
11 files changed, 355 insertions, 233 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 72613323..a1681d77 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -322,7 +322,7 @@ pub async fn handle_put_part(
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
// Check object is valid and multipart block can be accepted
- let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
+ let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
if !object
diff --git a/src/api/signature.rs b/src/api/signature.rs
index 402b1881..0ee47961 100644
--- a/src/api/signature.rs
+++ b/src/api/signature.rs
@@ -68,7 +68,7 @@ pub async fn check_signature(
.key_table
.get(&EmptyKey, &authorization.key_id)
.await?
- .filter(|k| !k.deleted)
+ .filter(|k| !k.deleted.get())
.ok_or(Error::Forbidden(format!(
"No such key: {}",
authorization.key_id
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index 778e4a1d..a23d3e95 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -2,9 +2,9 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use garage_util::data::*;
use garage_util::error::Error;
+use garage_table::crdt::CRDT;
use garage_table::*;
use garage_rpc::rpc_client::*;
@@ -79,25 +79,26 @@ impl AdminRpcHandler {
Ok(AdminRPC::BucketInfo(bucket))
}
BucketOperation::Create(query) => {
- let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
- if bucket.as_ref().filter(|b| !b.deleted).is_some() {
- return Err(Error::BadRPC(format!(
- "Bucket {} already exists",
- query.name
- )));
- }
- let new_time = match bucket {
- Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
- None => now_msec(),
+ let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
+ Some(mut bucket) => {
+ if !bucket.is_deleted() {
+ return Err(Error::BadRPC(format!(
+ "Bucket {} already exists",
+ query.name
+ )));
+ }
+ bucket
+ .state
+ .update(BucketState::Present(crdt::LWWMap::new()));
+ bucket
+ }
+ None => Bucket::new(query.name.clone()),
};
- self.garage
- .bucket_table
- .insert(&Bucket::new(query.name.clone(), new_time, false, vec![]))
- .await?;
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
}
BucketOperation::Delete(query) => {
- let bucket = self.get_existing_bucket(&query.name).await?;
+ let mut bucket = self.get_existing_bucket(&query.name).await?;
let objects = self
.garage
.object_table
@@ -112,25 +113,18 @@ impl AdminRpcHandler {
)));
}
// --- done checking, now commit ---
- for ak in bucket.authorized_keys() {
- if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? {
- if !key.deleted {
+ for (key_id, _, _) in bucket.authorized_keys() {
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
+ if !key.deleted.get() {
self.update_key_bucket(key, &bucket.name, false, false)
.await?;
}
} else {
- return Err(Error::Message(format!("Key not found: {}", ak.key_id)));
+ return Err(Error::Message(format!("Key not found: {}", key_id)));
}
}
- self.garage
- .bucket_table
- .insert(&Bucket::new(
- query.name.clone(),
- std::cmp::max(bucket.timestamp + 1, now_msec()),
- true,
- vec![],
- ))
- .await?;
+ bucket.state.update(BucketState::Deleted);
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
BucketOperation::Allow(query) => {
@@ -173,7 +167,7 @@ impl AdminRpcHandler {
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
.await?
.iter()
- .map(|k| (k.key_id.to_string(), k.name.to_string()))
+ .map(|k| (k.key_id.to_string(), k.name.get().clone()))
.collect::<Vec<_>>();
Ok(AdminRPC::KeyList(key_ids))
}
@@ -182,14 +176,13 @@ impl AdminRpcHandler {
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::New(query) => {
- let key = Key::new(query.name, vec![]);
+ let key = Key::new(query.name);
self.garage.key_table.insert(&key).await?;
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::Rename(query) => {
let mut key = self.get_existing_key(&query.key_id).await?;
- key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec());
- key.name = query.new_name;
+ key.name.update(query.new_name);
self.garage.key_table.insert(&key).await?;
Ok(AdminRPC::KeyInfo(key))
}
@@ -201,16 +194,14 @@ impl AdminRpcHandler {
)));
}
// --- done checking, now commit ---
- for ab in key.authorized_buckets().iter() {
- if let Some(bucket) =
- self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await?
- {
- if !bucket.deleted {
+ for (ab_name, _, _) in key.authorized_buckets.items().iter() {
+ if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
+ if !bucket.is_deleted() {
self.update_bucket_key(bucket, &key.key_id, false, false)
.await?;
}
} else {
- return Err(Error::Message(format!("Bucket not found: {}", ab.bucket)));
+ return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
}
}
let del_key = Key::delete(key.key_id);
@@ -228,7 +219,7 @@ impl AdminRpcHandler {
.bucket_table
.get(&EmptyKey, bucket)
.await?
- .filter(|b| !b.deleted)
+ .filter(|b| !b.is_deleted())
.map(Ok)
.unwrap_or(Err(Error::BadRPC(format!(
"Bucket {} does not exist",
@@ -241,7 +232,7 @@ impl AdminRpcHandler {
.key_table
.get(&EmptyKey, id)
.await?
- .filter(|k| !k.deleted)
+ .filter(|k| !k.deleted.get())
.map(Ok)
.unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
}
@@ -253,23 +244,20 @@ impl AdminRpcHandler {
allow_read: bool,
allow_write: bool,
) -> Result<(), Error> {
- let timestamp = match bucket
- .authorized_keys()
- .iter()
- .find(|x| x.key_id == *key_id)
- {
- None => now_msec(),
- Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
- };
- bucket.clear_keys();
- bucket
- .add_key(AllowedKey {
- key_id: key_id.clone(),
- timestamp,
- allow_read,
- allow_write,
- })
- .unwrap();
+ if let BucketState::Present(ak) = bucket.state.get_mut() {
+ let old_ak = ak.take_and_clear();
+ ak.merge(&old_ak.update_mutator(
+ key_id.to_string(),
+ PermissionSet {
+ allow_read,
+ allow_write,
+ },
+ ));
+ } else {
+ return Err(Error::Message(format!(
+ "Bucket is deleted in update_bucket_key"
+ )));
+ }
self.garage.bucket_table.insert(&bucket).await?;
Ok(())
}
@@ -281,22 +269,14 @@ impl AdminRpcHandler {
allow_read: bool,
allow_write: bool,
) -> Result<(), Error> {
- let timestamp = match key
- .authorized_buckets()
- .iter()
- .find(|x| x.bucket == *bucket)
- {
- None => now_msec(),
- Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
- };
- key.clear_buckets();
- key.add_bucket(AllowedBucket {
- bucket: bucket.clone(),
- timestamp,
- allow_read,
- allow_write,
- })
- .unwrap();
+ let old_map = key.authorized_buckets.take_and_clear();
+ key.authorized_buckets.merge(&old_map.update_mutator(
+ bucket.clone(),
+ PermissionSet {
+ allow_read,
+ allow_write,
+ },
+ ));
self.garage.key_table.insert(&key).await?;
Ok(())
}
diff --git a/src/model/block.rs b/src/model/block.rs
index 6a5d9c5b..8a513a3c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication;
-use garage_table::{TableReplication, DeletedFilter};
+use garage_table::{DeletedFilter, TableReplication};
use crate::block_ref_table::*;
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 35c0cc27..b7f24d71 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,71 +1,60 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_table::crdt::CRDT;
use garage_table::*;
+
use garage_util::error::Error;
+use crate::key_table::PermissionSet;
+
+use model010::bucket_table as prev;
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
pub name: String,
- // Timestamp and deletion
- // Upon version increment, all info is replaced
- pub timestamp: u64,
- pub deleted: bool,
+ pub state: crdt::LWW<BucketState>,
+}
- // Authorized keys
- authorized_keys: Vec<AllowedKey>,
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum BucketState {
+ Deleted,
+ Present(crdt::LWWMap<String, PermissionSet>),
}
-impl Bucket {
- pub fn new(
- name: String,
- timestamp: u64,
- deleted: bool,
- authorized_keys: Vec<AllowedKey>,
- ) -> Self {
- let mut ret = Bucket {
- name,
- timestamp,
- deleted,
- authorized_keys: vec![],
- };
- for key in authorized_keys {
- ret.add_key(key)
- .expect("Duplicate AllowedKey in Bucket constructor");
+impl CRDT for BucketState {
+ fn merge(&mut self, o: &Self) {
+ match o {
+ BucketState::Deleted => *self = BucketState::Deleted,
+ BucketState::Present(other_ak) => {
+ if let BucketState::Present(ak) = self {
+ ak.merge(other_ak);
+ }
+ }
}
- ret
}
- /// Add a key only if it is not already present
- pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
- match self
- .authorized_keys
- .binary_search_by(|k| k.key_id.cmp(&key.key_id))
- {
- Err(i) => {
- self.authorized_keys.insert(i, key);
- Ok(())
- }
- Ok(_) => Err(()),
+}
+
+impl Bucket {
+ pub fn new(name: String) -> Self {
+ Bucket {
+ name,
+ state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())),
}
}
- pub fn authorized_keys(&self) -> &[AllowedKey] {
- &self.authorized_keys[..]
+ pub fn is_deleted(&self) -> bool {
+ *self.state.get() == BucketState::Deleted
}
- pub fn clear_keys(&mut self) {
- self.authorized_keys.clear();
+ pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
+ match self.state.get() {
+ BucketState::Deleted => &[],
+ BucketState::Present(ak) => ak.items(),
+ }
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct AllowedKey {
- pub key_id: String,
- pub timestamp: u64,
- pub allow_read: bool,
- pub allow_write: bool,
-}
-
impl Entry<EmptyKey, String> for Bucket {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -75,36 +64,12 @@ impl Entry<EmptyKey, String> for Bucket {
}
fn merge(&mut self, other: &Self) {
- if other.timestamp > self.timestamp {
- *self = other.clone();
- return;
- }
- if self.timestamp > other.timestamp || self.deleted {
- return;
- }
-
- for ak in other.authorized_keys.iter() {
- match self
- .authorized_keys
- .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
- {
- Ok(i) => {
- let our_ak = &mut self.authorized_keys[i];
- if ak.timestamp > our_ak.timestamp {
- *our_ak = ak.clone();
- }
- }
- Err(i) => {
- self.authorized_keys.insert(i, ak.clone());
- }
- }
- }
+ self.state.merge(&other.state);
}
}
pub struct BucketTable;
-
#[async_trait]
impl TableSchema for BucketTable {
type P = EmptyKey;
@@ -117,6 +82,35 @@ impl TableSchema for BucketTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.is_deleted())
+ }
+
+ fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
+ let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
+ Ok(x) => x,
+ Err(_) => return None,
+ };
+ if old.deleted {
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
+ })
+ } else {
+ let mut keys = crdt::LWWMap::new();
+ for ak in old.authorized_keys() {
+ keys.merge(&crdt::LWWMap::migrate_from_raw_item(
+ ak.key_id.clone(),
+ ak.timestamp,
+ PermissionSet {
+ allow_read: ak.allow_read,
+ allow_write: ak.allow_write,
+ },
+ ));
+ }
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)),
+ })
+ }
}
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 05b938ce..20da3cc6 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,10 +1,13 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_table::crdt::CRDT;
use garage_table::*;
-use garage_util::data::*;
+
use garage_util::error::Error;
+use model010::key_table as prev;
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
@@ -14,83 +17,54 @@ pub struct Key {
pub secret_key: String,
// Name
- pub name: String,
- pub name_timestamp: u64,
+ pub name: crdt::LWW<String>,
// Deletion
- pub deleted: bool,
+ pub deleted: crdt::Bool,
// Authorized keys
- authorized_buckets: Vec<AllowedBucket>,
+ pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
+ // CRDT interaction: deleted implies authorized_buckets is empty
}
impl Key {
- pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self {
+ pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
- let mut ret = Self {
+ Self {
key_id,
secret_key,
- name,
- name_timestamp: now_msec(),
- deleted: false,
- authorized_buckets: vec![],
- };
- for b in buckets {
- ret.add_bucket(b)
- .expect("Duplicate AllowedBucket in Key constructor");
+ name: crdt::LWW::new(name),
+ deleted: crdt::Bool::new(false),
+ authorized_buckets: crdt::LWWMap::new(),
}
- ret
}
pub fn delete(key_id: String) -> Self {
Self {
key_id,
secret_key: "".into(),
- name: "".into(),
- name_timestamp: now_msec(),
- deleted: true,
- authorized_buckets: vec![],
+ name: crdt::LWW::new("".to_string()),
+ deleted: crdt::Bool::new(true),
+ authorized_buckets: crdt::LWWMap::new(),
}
}
/// Add an authorized bucket, only if it wasn't there before
- pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
- match self
- .authorized_buckets
- .binary_search_by(|b| b.bucket.cmp(&new.bucket))
- {
- Err(i) => {
- self.authorized_buckets.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
- }
- pub fn authorized_buckets(&self) -> &[AllowedBucket] {
- &self.authorized_buckets[..]
- }
- pub fn clear_buckets(&mut self) {
- self.authorized_buckets.clear();
- }
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
- .iter()
- .find(|x| x.bucket.as_str() == bucket)
+ .get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
- .iter()
- .find(|x| x.bucket.as_str() == bucket)
+ .get(&bucket.to_string())
.map(|x| x.allow_write)
.unwrap_or(false)
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct AllowedBucket {
- pub bucket: String,
- pub timestamp: u64,
+#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct PermissionSet {
pub allow_read: bool,
pub allow_write: bool,
}
@@ -104,35 +78,15 @@ impl Entry<EmptyKey, String> for Key {
}
fn merge(&mut self, other: &Self) {
- if other.name_timestamp > self.name_timestamp {
- self.name_timestamp = other.name_timestamp;
- self.name = other.name.clone();
- }
+ self.name.merge(&other.name);
+ self.deleted.merge(&other.deleted);
- if other.deleted {
- self.deleted = true;
- }
- if self.deleted {
+ if self.deleted.get() {
self.authorized_buckets.clear();
return;
}
- for ab in other.authorized_buckets.iter() {
- match self
- .authorized_buckets
- .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
- {
- Ok(i) => {
- let our_ab = &mut self.authorized_buckets[i];
- if ab.timestamp > our_ab.timestamp {
- *our_ab = ab.clone();
- }
- }
- Err(i) => {
- self.authorized_buckets.insert(i, ab.clone());
- }
- }
- }
+ self.authorized_buckets.merge(&other.authorized_buckets);
}
}
@@ -150,6 +104,32 @@ impl TableSchema for KeyTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
+ }
+
+ fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
+ let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) {
+ Ok(x) => x,
+ Err(_) => return None,
+ };
+ let mut new = Self::E {
+ key_id: old.key_id.clone(),
+ secret_key: old.secret_key.clone(),
+ name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()),
+ deleted: crdt::Bool::new(old.deleted),
+ authorized_buckets: crdt::LWWMap::new(),
+ };
+ for ab in old.authorized_buckets() {
+ let it = crdt::LWWMap::migrate_from_raw_item(
+ ab.bucket.clone(),
+ ab.timestamp,
+ PermissionSet {
+ allow_read: ab.allow_read,
+ allow_write: ab.allow_write,
+ },
+ );
+ new.authorized_buckets.merge(&it);
+ }
+ Some(new)
}
}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 1963f3da..945763fa 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -21,6 +21,7 @@ rand = "0.7"
hex = "0.3"
arc-swap = "0.4"
log = "0.4"
+hexdump = "0.1"
sled = "0.31"
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
new file mode 100644
index 00000000..2b903cf0
--- /dev/null
+++ b/src/table/crdt.rs
@@ -0,0 +1,162 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+pub trait CRDT {
+ fn merge(&mut self, other: &Self);
+}
+
+impl<T> CRDT for T
+where
+ T: Ord + Clone,
+{
+ fn merge(&mut self, other: &Self) {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+}
+
+// ---- LWW Register ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T> {
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where
+ T: CRDT,
+{
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self { ts, v: value }
+ }
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where
+ T: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
+
+// ---- Boolean (true as absorbing state) ----
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
+
+// ---- LWW Map ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V> {
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ pub fn new() -> Self {
+ Self { vals: vec![] }
+ }
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self {
+ vals: vec![(k, ts, v)],
+ }
+ }
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self { vals }
+ }
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => vec![(k, now_msec(), new_v)],
+ };
+ Self { vals: new_vals }
+ }
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None,
+ }
+ }
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
+ Ok(i) => {
+ let (_, ts1, _v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 7684fe9d..704f8f1e 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -3,6 +3,7 @@
#[macro_use]
extern crate log;
+pub mod crdt;
pub mod schema;
pub mod util;
@@ -12,5 +13,5 @@ pub mod table_sharded;
pub mod table_sync;
pub use schema::*;
-pub use util::*;
pub use table::*;
+pub use util::*;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 49cede0a..d2ec9450 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -20,7 +20,6 @@ impl PartitionKey for Hash {
}
}
-
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
@@ -37,7 +36,6 @@ impl SortKey for Hash {
}
}
-
pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
@@ -47,7 +45,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-
#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync {
true
}
}
-
diff --git a/src/table/table.rs b/src/table/table.rs
index 2beac3f4..5dfee3c8 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use log::warn;
+
use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
@@ -185,7 +187,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = Self::decode_entry(v_bytes.as_slice())?;
+ let v = self.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -241,7 +243,7 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = Self::decode_entry(entry_bytes.as_slice())?;
+ let entry = self.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
@@ -363,7 +365,7 @@ where
let keep = match filter {
None => true,
Some(f) => {
- let entry = Self::decode_entry(value.as_ref())?;
+ let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
@@ -382,14 +384,14 @@ where
let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
- let update = Self::decode_entry(update_bytes.as_slice())?;
+ let update = self.decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
- let old_entry = Self::decode_entry(&prev_bytes)
+ let old_entry = self.decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
@@ -437,7 +439,7 @@ where
break;
}
if let Some(old_val) = self.store.remove(&key)? {
- let old_entry = Self::decode_entry(&old_val)?;
+ let old_entry = self.decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
@@ -455,12 +457,18 @@ where
ret
}
- fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
+ fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
- None => Err(e.into()),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(e.into())
+ }
},
}
}