aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-11-20 23:01:12 +0100
committerAlex Auvolat <alex@adnab.me>2020-11-20 23:53:54 +0100
commitf8a04852a2cda84d4fd12ec19e90bbd8224bf4e9 (patch)
treebab388b5958fa0ae2bd8135ed137a54a0d97fc80 /src/model
parente02e9e035e269cd6b660c92706db424a2edb5306 (diff)
downloadgarage-f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9.tar.gz
garage-f8a04852a2cda84d4fd12ec19e90bbd8224bf4e9.zip
Convert bucket table to better CRDT representation
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/bucket_table.rs137
-rw-r--r--src/model/key_table.rs8
3 files changed, 71 insertions, 76 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 6a5d9c5b..8a513a3c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication;
-use garage_table::{TableReplication, DeletedFilter};
+use garage_table::{DeletedFilter, TableReplication};
use crate::block_ref_table::*;
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 35c0cc27..93421acb 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,71 +1,61 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_table::crdt::CRDT;
use garage_table::*;
+
use garage_util::error::Error;
+use crate::key_table::PermissionSet;
+
+use model010::bucket_table as prev;
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
pub name: String,
- // Timestamp and deletion
- // Upon version increment, all info is replaced
- pub timestamp: u64,
- pub deleted: bool,
+ pub state: crdt::LWW<BucketState>,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum BucketState {
+ Deleted,
+ Present(crdt::LWWMap<String, PermissionSet>),
+}
- // Authorized keys
- authorized_keys: Vec<AllowedKey>,
+impl CRDT for BucketState {
+ fn merge(&mut self, o: &Self) {
+ match o {
+ BucketState::Deleted => *self = BucketState::Deleted,
+ BucketState::Present(other_ak) => {
+ if let BucketState::Present(ak) = self {
+ ak.merge(other_ak);
+ }
+ }
+ }
+ }
}
impl Bucket {
- pub fn new(
- name: String,
- timestamp: u64,
- deleted: bool,
- authorized_keys: Vec<AllowedKey>,
- ) -> Self {
- let mut ret = Bucket {
+ pub fn new(name: String) -> Self {
+ let ret = Bucket {
name,
- timestamp,
- deleted,
- authorized_keys: vec![],
+ state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())),
};
- for key in authorized_keys {
- ret.add_key(key)
- .expect("Duplicate AllowedKey in Bucket constructor");
- }
ret
}
- /// Add a key only if it is not already present
- pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
- match self
- .authorized_keys
- .binary_search_by(|k| k.key_id.cmp(&key.key_id))
- {
- Err(i) => {
- self.authorized_keys.insert(i, key);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+ pub fn is_deleted(&self) -> bool {
+ *self.state.get() == BucketState::Deleted
}
- pub fn authorized_keys(&self) -> &[AllowedKey] {
- &self.authorized_keys[..]
- }
- pub fn clear_keys(&mut self) {
- self.authorized_keys.clear();
+ pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
+ match self.state.get() {
+ BucketState::Deleted => &[],
+ BucketState::Present(ak) => ak.items(),
+ }
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct AllowedKey {
- pub key_id: String,
- pub timestamp: u64,
- pub allow_read: bool,
- pub allow_write: bool,
-}
-
impl Entry<EmptyKey, String> for Bucket {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -75,36 +65,12 @@ impl Entry<EmptyKey, String> for Bucket {
}
fn merge(&mut self, other: &Self) {
- if other.timestamp > self.timestamp {
- *self = other.clone();
- return;
- }
- if self.timestamp > other.timestamp || self.deleted {
- return;
- }
-
- for ak in other.authorized_keys.iter() {
- match self
- .authorized_keys
- .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
- {
- Ok(i) => {
- let our_ak = &mut self.authorized_keys[i];
- if ak.timestamp > our_ak.timestamp {
- *our_ak = ak.clone();
- }
- }
- Err(i) => {
- self.authorized_keys.insert(i, ak.clone());
- }
- }
- }
+ self.state.merge(&other.state);
}
}
pub struct BucketTable;
-
#[async_trait]
impl TableSchema for BucketTable {
type P = EmptyKey;
@@ -117,6 +83,35 @@ impl TableSchema for BucketTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.is_deleted())
+ }
+
+ fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
+ let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
+ Ok(x) => x,
+ Err(_) => return None,
+ };
+ if old.deleted {
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
+ })
+ } else {
+ let mut keys = crdt::LWWMap::new();
+ for ak in old.authorized_keys() {
+ keys.merge(&crdt::LWWMap::migrate_from_raw_item(
+ ak.key_id.clone(),
+ ak.timestamp,
+ PermissionSet {
+ allow_read: ak.allow_read,
+ allow_write: ak.allow_write,
+ },
+ ));
+ }
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)),
+ })
+ }
}
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 2b825aa3..ff9d7b79 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use garage_table::*;
use garage_table::crdt::CRDT;
+use garage_table::*;
use garage_util::error::Error;
@@ -24,7 +24,6 @@ pub struct Key {
// Authorized keys
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
-
// CRDT interaction: deleted implies authorized_buckets is empty
}
@@ -125,10 +124,11 @@ impl TableSchema for KeyTable {
let it = crdt::LWWMap::migrate_from_raw_item(
ab.bucket.clone(),
ab.timestamp,
- PermissionSet{
+ PermissionSet {
allow_read: ab.allow_read,
allow_write: ab.allow_write,
- });
+ },
+ );
new.authorized_buckets.merge(&it);
}
Some(new)