aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/signature.rs2
-rw-r--r--src/garage/admin_rpc.rs42
-rw-r--r--src/model/key_table.rs121
-rw-r--r--src/table/crdt.rs175
-rw-r--r--src/table/lib.rs1
5 files changed, 245 insertions, 96 deletions
diff --git a/src/api/signature.rs b/src/api/signature.rs
index 402b1881..0ee47961 100644
--- a/src/api/signature.rs
+++ b/src/api/signature.rs
@@ -68,7 +68,7 @@ pub async fn check_signature(
.key_table
.get(&EmptyKey, &authorization.key_id)
.await?
- .filter(|k| !k.deleted)
+ .filter(|k| !k.deleted.get())
.ok_or(Error::Forbidden(format!(
"No such key: {}",
authorization.key_id
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index 778e4a1d..bd9fca49 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -6,6 +6,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_table::*;
+use garage_table::crdt::CRDT;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -114,7 +115,7 @@ impl AdminRpcHandler {
// --- done checking, now commit ---
for ak in bucket.authorized_keys() {
if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? {
- if !key.deleted {
+ if !key.deleted.get() {
self.update_key_bucket(key, &bucket.name, false, false)
.await?;
}
@@ -173,7 +174,7 @@ impl AdminRpcHandler {
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
.await?
.iter()
- .map(|k| (k.key_id.to_string(), k.name.to_string()))
+ .map(|k| (k.key_id.to_string(), k.name.get().clone()))
.collect::<Vec<_>>();
Ok(AdminRPC::KeyList(key_ids))
}
@@ -182,14 +183,13 @@ impl AdminRpcHandler {
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::New(query) => {
- let key = Key::new(query.name, vec![]);
+ let key = Key::new(query.name);
self.garage.key_table.insert(&key).await?;
Ok(AdminRPC::KeyInfo(key))
}
KeyOperation::Rename(query) => {
let mut key = self.get_existing_key(&query.key_id).await?;
- key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec());
- key.name = query.new_name;
+ key.name.update(query.new_name);
self.garage.key_table.insert(&key).await?;
Ok(AdminRPC::KeyInfo(key))
}
@@ -201,16 +201,16 @@ impl AdminRpcHandler {
)));
}
// --- done checking, now commit ---
- for ab in key.authorized_buckets().iter() {
+ for (ab_name, _, _) in key.authorized_buckets.items().iter() {
if let Some(bucket) =
- self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await?
+ self.garage.bucket_table.get(&EmptyKey, ab_name).await?
{
if !bucket.deleted {
self.update_bucket_key(bucket, &key.key_id, false, false)
.await?;
}
} else {
- return Err(Error::Message(format!("Bucket not found: {}", ab.bucket)));
+ return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
}
}
let del_key = Key::delete(key.key_id);
@@ -241,7 +241,7 @@ impl AdminRpcHandler {
.key_table
.get(&EmptyKey, id)
.await?
- .filter(|k| !k.deleted)
+ .filter(|k| !k.deleted.get())
.map(Ok)
.unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
}
@@ -281,22 +281,14 @@ impl AdminRpcHandler {
allow_read: bool,
allow_write: bool,
) -> Result<(), Error> {
- let timestamp = match key
- .authorized_buckets()
- .iter()
- .find(|x| x.bucket == *bucket)
- {
- None => now_msec(),
- Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
- };
- key.clear_buckets();
- key.add_bucket(AllowedBucket {
- bucket: bucket.clone(),
- timestamp,
- allow_read,
- allow_write,
- })
- .unwrap();
+ let old_map = key.authorized_buckets.take_and_clear();
+ key.authorized_buckets.merge(
+ &old_map.update_mutator(
+ bucket.clone(),
+ PermissionSet{
+ allow_read, allow_write
+ }
+ ));
self.garage.key_table.insert(&key).await?;
Ok(())
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 05b938ce..2b825aa3 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -2,9 +2,12 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_table::*;
-use garage_util::data::*;
+use garage_table::crdt::CRDT;
+
use garage_util::error::Error;
+use model010::key_table as prev;
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
// Primary key
@@ -14,83 +17,56 @@ pub struct Key {
pub secret_key: String,
// Name
- pub name: String,
- pub name_timestamp: u64,
+ pub name: crdt::LWW<String>,
// Deletion
- pub deleted: bool,
+ pub deleted: crdt::Bool,
// Authorized keys
- authorized_buckets: Vec<AllowedBucket>,
+ pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
+
+ // CRDT interaction: deleted implies authorized_buckets is empty
}
impl Key {
- pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self {
+ pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
- let mut ret = Self {
+ let ret = Self {
key_id,
secret_key,
- name,
- name_timestamp: now_msec(),
- deleted: false,
- authorized_buckets: vec![],
+ name: crdt::LWW::new(name),
+ deleted: crdt::Bool::new(false),
+ authorized_buckets: crdt::LWWMap::new(),
};
- for b in buckets {
- ret.add_bucket(b)
- .expect("Duplicate AllowedBucket in Key constructor");
- }
ret
}
pub fn delete(key_id: String) -> Self {
Self {
key_id,
secret_key: "".into(),
- name: "".into(),
- name_timestamp: now_msec(),
- deleted: true,
- authorized_buckets: vec![],
+ name: crdt::LWW::new("".to_string()),
+ deleted: crdt::Bool::new(true),
+ authorized_buckets: crdt::LWWMap::new(),
}
}
/// Add an authorized bucket, only if it wasn't there before
- pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
- match self
- .authorized_buckets
- .binary_search_by(|b| b.bucket.cmp(&new.bucket))
- {
- Err(i) => {
- self.authorized_buckets.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
- }
- pub fn authorized_buckets(&self) -> &[AllowedBucket] {
- &self.authorized_buckets[..]
- }
- pub fn clear_buckets(&mut self) {
- self.authorized_buckets.clear();
- }
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
- .iter()
- .find(|x| x.bucket.as_str() == bucket)
+ .get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
- .iter()
- .find(|x| x.bucket.as_str() == bucket)
+ .get(&bucket.to_string())
.map(|x| x.allow_write)
.unwrap_or(false)
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct AllowedBucket {
- pub bucket: String,
- pub timestamp: u64,
+#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct PermissionSet {
pub allow_read: bool,
pub allow_write: bool,
}
@@ -104,35 +80,15 @@ impl Entry<EmptyKey, String> for Key {
}
fn merge(&mut self, other: &Self) {
- if other.name_timestamp > self.name_timestamp {
- self.name_timestamp = other.name_timestamp;
- self.name = other.name.clone();
- }
+ self.name.merge(&other.name);
+ self.deleted.merge(&other.deleted);
- if other.deleted {
- self.deleted = true;
- }
- if self.deleted {
+ if self.deleted.get() {
self.authorized_buckets.clear();
return;
}
- for ab in other.authorized_buckets.iter() {
- match self
- .authorized_buckets
- .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
- {
- Ok(i) => {
- let our_ab = &mut self.authorized_buckets[i];
- if ab.timestamp > our_ab.timestamp {
- *our_ab = ab.clone();
- }
- }
- Err(i) => {
- self.authorized_buckets.insert(i, ab.clone());
- }
- }
- }
+ self.authorized_buckets.merge(&other.authorized_buckets);
}
}
@@ -150,6 +106,31 @@ impl TableSchema for KeyTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
+ }
+
+ fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
+ let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) {
+ Ok(x) => x,
+ Err(_) => return None,
+ };
+ let mut new = Self::E {
+ key_id: old.key_id.clone(),
+ secret_key: old.secret_key.clone(),
+ name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()),
+ deleted: crdt::Bool::new(old.deleted),
+ authorized_buckets: crdt::LWWMap::new(),
+ };
+ for ab in old.authorized_buckets() {
+ let it = crdt::LWWMap::migrate_from_raw_item(
+ ab.bucket.clone(),
+ ab.timestamp,
+ PermissionSet{
+ allow_read: ab.allow_read,
+ allow_write: ab.allow_write,
+ });
+ new.authorized_buckets.merge(&it);
+ }
+ Some(new)
}
}
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
new file mode 100644
index 00000000..8f5e4d71
--- /dev/null
+++ b/src/table/crdt.rs
@@ -0,0 +1,175 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+pub trait CRDT {
+ fn merge(&mut self, other: &Self);
+}
+
+impl<T> CRDT for T
+where T: Ord + Clone {
+ fn merge(&mut self, other: &Self) {
+ if other > self {
+ *self = other.clone();
+ }
+ }
+}
+
+// ---- LWW Register ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWW<T>
+{
+ ts: u64,
+ v: T,
+}
+
+impl<T> LWW<T>
+where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord
+{
+ pub fn new(value: T) -> Self {
+ Self {
+ ts: now_msec(),
+ v: value,
+ }
+ }
+ pub fn migrate_from_raw(ts: u64, value: T) -> Self {
+ Self {
+ ts,
+ v: value,
+ }
+ }
+ pub fn update(&mut self, new_value: T) {
+ self.ts = std::cmp::max(self.ts + 1, now_msec());
+ self.v = new_value;
+ }
+ pub fn get(&self) -> &T {
+ &self.v
+ }
+}
+
+impl<T> CRDT for LWW<T>
+where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT
+{
+ fn merge(&mut self, other: &Self) {
+ if other.ts > self.ts {
+ self.ts = other.ts;
+ self.v = other.v.clone();
+ } else if other.ts == self.ts {
+ self.v.merge(&other.v);
+ }
+ }
+}
+
+
+// ---- Boolean (true as absorbing state) ----
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub struct Bool(bool);
+
+impl Bool {
+ pub fn new(b: bool) -> Self {
+ Self(b)
+ }
+ pub fn set(&mut self) {
+ self.0 = true;
+ }
+ pub fn get(&self) -> bool {
+ self.0
+ }
+}
+
+impl CRDT for Bool {
+ fn merge(&mut self, other: &Self) {
+ self.0 = self.0 || other.0;
+ }
+}
+
+
+// ---- LWW Map ----
+
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+pub struct LWWMap<K, V>
+{
+ vals: Vec<(K, u64, V)>,
+}
+
+impl<K, V> LWWMap<K, V>
+where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+{
+ pub fn new() -> Self {
+ Self{
+ vals: vec![],
+ }
+ }
+ pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
+ Self{
+ vals: vec![(k, ts, v)],
+ }
+ }
+ pub fn take_and_clear(&mut self) -> Self {
+ let vals = std::mem::replace(&mut self.vals, vec![]);
+ Self{vals}
+ }
+ pub fn clear(&mut self) {
+ self.vals.clear();
+ }
+ pub fn update_mutator(&self, k: K, new_v: V) -> Self {
+ let new_vals = match self
+ .vals
+ .binary_search_by(|(k2, _, _)| k2.cmp(&k))
+ {
+ Ok(i) => {
+ let (_, old_ts, _) = self.vals[i];
+ let new_ts = std::cmp::max(old_ts+1, now_msec());
+ vec![(k, new_ts, new_v)]
+ }
+ Err(_) => {
+ vec![(k, now_msec(), new_v)]
+ }
+ };
+ Self{
+ vals: new_vals,
+ }
+ }
+ pub fn get(&self, k: &K) -> Option<&V> {
+ match self
+ .vals
+ .binary_search_by(|(k2, _, _)| k2.cmp(&k))
+ {
+ Ok(i) => Some(&self.vals[i].2),
+ Err(_) => None
+ }
+ }
+ pub fn items(&self) -> &[(K, u64, V)] {
+ &self.vals[..]
+ }
+}
+
+impl<K, V> CRDT for LWWMap<K, V>
+where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
+{
+ fn merge(&mut self, other: &Self) {
+ for (k, ts2, v2) in other.vals.iter() {
+ match self
+ .vals
+ .binary_search_by(|(k2, _, _)| k2.cmp(&k))
+ {
+ Ok(i) => {
+ let (_, ts1, v1) = &self.vals[i];
+ if ts2 > ts1 {
+ self.vals[i].1 = *ts2;
+ self.vals[i].2 = v2.clone();
+ } else if ts1 == ts2 {
+ self.vals[i].2.merge(&v2);
+ }
+ }
+ Err(i) => {
+ self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 7684fe9d..e2bf1f46 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -5,6 +5,7 @@ extern crate log;
pub mod schema;
pub mod util;
+pub mod crdt;
pub mod table;
pub mod table_fullcopy;