aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/s3_put.rs2
-rw-r--r--src/garage/admin_rpc.rs105
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/bucket_table.rs137
-rw-r--r--src/model/key_table.rs8
-rw-r--r--src/table/crdt.rs73
-rw-r--r--src/table/lib.rs4
-rw-r--r--src/table/schema.rs4
8 files changed, 150 insertions, 185 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 72613323..a1681d77 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -322,7 +322,7 @@ pub async fn handle_put_part(
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
// Check object is valid and multipart block can be accepted
- let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
+ let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
if !object
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index bd9fca49..c2b2f22c 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -2,11 +2,10 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use garage_util::data::*;
use garage_util::error::Error;
-use garage_table::*;
use garage_table::crdt::CRDT;
+use garage_table::*;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -80,25 +79,26 @@ impl AdminRpcHandler {
Ok(AdminRPC::BucketInfo(bucket))
}
BucketOperation::Create(query) => {
- let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
- if bucket.as_ref().filter(|b| !b.deleted).is_some() {
- return Err(Error::BadRPC(format!(
- "Bucket {} already exists",
- query.name
- )));
- }
- let new_time = match bucket {
- Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
- None => now_msec(),
+ let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
+ Some(mut bucket) => {
+ if !bucket.is_deleted() {
+ return Err(Error::BadRPC(format!(
+ "Bucket {} already exists",
+ query.name
+ )));
+ }
+ bucket
+ .state
+ .update(BucketState::Present(crdt::LWWMap::new()));
+ bucket
+ }
+ None => Bucket::new(query.name.clone()),
};
- self.garage
- .bucket_table
- .insert(&Bucket::new(query.name.clone(), new_time, false, vec![]))
- .await?;
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
}
BucketOperation::Delete(query) => {
- let bucket = self.get_existing_bucket(&query.name).await?;
+ let mut bucket = self.get_existing_bucket(&query.name).await?;
let objects = self
.garage
.object_table
@@ -113,25 +113,18 @@ impl AdminRpcHandler {
)));
}
// --- done checking, now commit ---
- for ak in bucket.authorized_keys() {
- if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? {
+ for (key_id, _, _) in bucket.authorized_keys() {
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
if !key.deleted.get() {
self.update_key_bucket(key, &bucket.name, false, false)
.await?;
}
} else {
- return Err(Error::Message(format!("Key not found: {}", ak.key_id)));
+ return Err(Error::Message(format!("Key not found: {}", key_id)));
}
}
- self.garage
- .bucket_table
- .insert(&Bucket::new(
- query.name.clone(),
- std::cmp::max(bucket.timestamp + 1, now_msec()),
- true,
- vec![],
- ))
- .await?;
+ bucket.state.update(BucketState::Deleted);
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
BucketOperation::Allow(query) => {
@@ -202,10 +195,8 @@ impl AdminRpcHandler {
}
// --- done checking, now commit ---
for (ab_name, _, _) in key.authorized_buckets.items().iter() {
- if let Some(bucket) =
- self.garage.bucket_table.get(&EmptyKey, ab_name).await?
- {
- if !bucket.deleted {
+ if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
+ if !bucket.is_deleted() {
self.update_bucket_key(bucket, &key.key_id, false, false)
.await?;
}
@@ -228,7 +219,7 @@ impl AdminRpcHandler {
.bucket_table
.get(&EmptyKey, bucket)
.await?
- .filter(|b| !b.deleted)
+ .filter(|b| !b.is_deleted())
.map(Ok)
.unwrap_or(Err(Error::BadRPC(format!(
"Bucket {} does not exist",
@@ -253,24 +244,20 @@ impl AdminRpcHandler {
allow_read: bool,
allow_write: bool,
) -> Result<(), Error> {
- let timestamp = match bucket
- .authorized_keys()
- .iter()
- .find(|x| x.key_id == *key_id)
- {
- None => now_msec(),
- Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
- };
- bucket.clear_keys();
- bucket
- .add_key(AllowedKey {
- key_id: key_id.clone(),
- timestamp,
- allow_read,
- allow_write,
- })
- .unwrap();
- self.garage.bucket_table.insert(&bucket).await?;
+ if let BucketState::Present(ak) = bucket.state.get_mut() {
+ let old_ak = ak.take_and_clear();
+ ak.merge(&old_ak.update_mutator(
+ key_id.to_string(),
+ PermissionSet {
+ allow_read,
+ allow_write,
+ },
+ ));
+ } else {
+ return Err(Error::Message(format!(
+ "Bucket is deleted in update_bucket_key"
+ )));
+ }
Ok(())
}
@@ -282,13 +269,13 @@ impl AdminRpcHandler {
allow_write: bool,
) -> Result<(), Error> {
let old_map = key.authorized_buckets.take_and_clear();
- key.authorized_buckets.merge(
- &old_map.update_mutator(
- bucket.clone(),
- PermissionSet{
- allow_read, allow_write
- }
- ));
+ key.authorized_buckets.merge(&old_map.update_mutator(
+ bucket.clone(),
+ PermissionSet {
+ allow_read,
+ allow_write,
+ },
+ ));
self.garage.key_table.insert(&key).await?;
Ok(())
}
diff --git a/src/model/block.rs b/src/model/block.rs
index 6a5d9c5b..8a513a3c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication;
-use garage_table::{TableReplication, DeletedFilter};
+use garage_table::{DeletedFilter, TableReplication};
use crate::block_ref_table::*;
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 35c0cc27..93421acb 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,71 +1,61 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_table::crdt::CRDT;
use garage_table::*;
+
use garage_util::error::Error;
+use crate::key_table::PermissionSet;
+
+use model010::bucket_table as prev;
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
// Primary key
pub name: String,
- // Timestamp and deletion
- // Upon version increment, all info is replaced
- pub timestamp: u64,
- pub deleted: bool,
+ pub state: crdt::LWW<BucketState>,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub enum BucketState {
+ Deleted,
+ Present(crdt::LWWMap<String, PermissionSet>),
+}
- // Authorized keys
- authorized_keys: Vec<AllowedKey>,
+impl CRDT for BucketState {
+ fn merge(&mut self, o: &Self) {
+ match o {
+ BucketState::Deleted => *self = BucketState::Deleted,
+ BucketState::Present(other_ak) => {
+ if let BucketState::Present(ak) = self {
+ ak.merge(other_ak);
+ }
+ }
+ }
+ }
}
impl Bucket {
- pub fn new(
- name: String,
- timestamp: u64,
- deleted: bool,
- authorized_keys: Vec<AllowedKey>,
- ) -> Self {
- let mut ret = Bucket {
+ pub fn new(name: String) -> Self {
+ let ret = Bucket {
name,
- timestamp,
- deleted,
- authorized_keys: vec![],
+ state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())),
};
- for key in authorized_keys {
- ret.add_key(key)
- .expect("Duplicate AllowedKey in Bucket constructor");
- }
ret
}
- /// Add a key only if it is not already present
- pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
- match self
- .authorized_keys
- .binary_search_by(|k| k.key_id.cmp(&key.key_id))
- {
- Err(i) => {
- self.authorized_keys.insert(i, key);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+ pub fn is_deleted(&self) -> bool {
+ *self.state.get() == BucketState::Deleted
}
- pub fn authorized_keys(&self) -> &[AllowedKey] {
- &self.authorized_keys[..]
- }
- pub fn clear_keys(&mut self) {
- self.authorized_keys.clear();
+ pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
+ match self.state.get() {
+ BucketState::Deleted => &[],
+ BucketState::Present(ak) => ak.items(),
+ }
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct AllowedKey {
- pub key_id: String,
- pub timestamp: u64,
- pub allow_read: bool,
- pub allow_write: bool,
-}
-
impl Entry<EmptyKey, String> for Bucket {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -75,36 +65,12 @@ impl Entry<EmptyKey, String> for Bucket {
}
fn merge(&mut self, other: &Self) {
- if other.timestamp > self.timestamp {
- *self = other.clone();
- return;
- }
- if self.timestamp > other.timestamp || self.deleted {
- return;
- }
-
- for ak in other.authorized_keys.iter() {
- match self
- .authorized_keys
- .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
- {
- Ok(i) => {
- let our_ak = &mut self.authorized_keys[i];
- if ak.timestamp > our_ak.timestamp {
- *our_ak = ak.clone();
- }
- }
- Err(i) => {
- self.authorized_keys.insert(i, ak.clone());
- }
- }
- }
+ self.state.merge(&other.state);
}
}
pub struct BucketTable;
-
#[async_trait]
impl TableSchema for BucketTable {
type P = EmptyKey;
@@ -117,6 +83,35 @@ impl TableSchema for BucketTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.is_deleted())
+ }
+
+ fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
+ let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
+ Ok(x) => x,
+ Err(_) => return None,
+ };
+ if old.deleted {
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
+ })
+ } else {
+ let mut keys = crdt::LWWMap::new();
+ for ak in old.authorized_keys() {
+ keys.merge(&crdt::LWWMap::migrate_from_raw_item(
+ ak.key_id.clone(),
+ ak.timestamp,
+ PermissionSet {
+ allow_read: ak.allow_read,
+ allow_write: ak.allow_write,
+ },
+ ));
+ }
+ Some(Bucket {
+ name: old.name,
+ state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)),
+ })
+ }
}
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 2b825aa3..ff9d7b79 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use garage_table::*;
use garage_table::crdt::CRDT;
+use garage_table::*;
use garage_util::error::Error;
@@ -24,7 +24,6 @@ pub struct Key {
// Authorized keys
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
-
// CRDT interaction: deleted implies authorized_buckets is empty
}
@@ -125,10 +124,11 @@ impl TableSchema for KeyTable {
let it = crdt::LWWMap::migrate_from_raw_item(
ab.bucket.clone(),
ab.timestamp,
- PermissionSet{
+ PermissionSet {
allow_read: ab.allow_read,
allow_write: ab.allow_write,
- });
+ },
+ );
new.authorized_buckets.merge(&it);
}
Some(new)
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
index 8f5e4d71..7c888e3a 100644
--- a/src/table/crdt.rs
+++ b/src/table/crdt.rs
@@ -7,7 +7,9 @@ pub trait CRDT {
}
impl<T> CRDT for T
-where T: Ord + Clone {
+where
+ T: Ord + Clone,
+{
fn merge(&mut self, other: &Self) {
if other > self {
*self = other.clone();
@@ -18,14 +20,14 @@ where T: Ord + Clone {
// ---- LWW Register ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T>
-{
+pub struct LWW<T> {
ts: u64,
v: T,
}
impl<T> LWW<T>
-where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord
+where
+ T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{
pub fn new(value: T) -> Self {
Self {
@@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
}
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
- Self {
- ts,
- v: value,
- }
+ Self { ts, v: value }
}
pub fn update(&mut self, new_value: T) {
self.ts = std::cmp::max(self.ts + 1, now_msec());
@@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
pub fn get(&self) -> &T {
&self.v
}
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
}
impl<T> CRDT for LWW<T>
-where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT
+where
+ T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{
fn merge(&mut self, other: &Self) {
if other.ts > self.ts {
@@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
}
-
// ---- Boolean (true as absorbing state) ----
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
@@ -85,61 +87,48 @@ impl CRDT for Bool {
}
}
-
// ---- LWW Map ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V>
-{
+pub struct LWWMap<K, V> {
vals: Vec<(K, u64, V)>,
}
impl<K, V> LWWMap<K, V>
-where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
- V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+where
+ K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
{
pub fn new() -> Self {
- Self{
- vals: vec![],
- }
+ Self { vals: vec![] }
}
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
- Self{
+ Self {
vals: vec![(k, ts, v)],
}
}
pub fn take_and_clear(&mut self) -> Self {
let vals = std::mem::replace(&mut self.vals, vec![]);
- Self{vals}
+ Self { vals }
}
pub fn clear(&mut self) {
self.vals.clear();
}
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
- let new_vals = match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, old_ts, _) = self.vals[i];
- let new_ts = std::cmp::max(old_ts+1, now_msec());
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
vec![(k, new_ts, new_v)]
}
- Err(_) => {
- vec![(k, now_msec(), new_v)]
- }
+ Err(_) => vec![(k, now_msec(), new_v)],
};
- Self{
- vals: new_vals,
- }
+ Self { vals: new_vals }
}
pub fn get(&self, k: &K) -> Option<&V> {
- match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => Some(&self.vals[i].2),
- Err(_) => None
+ Err(_) => None,
}
}
pub fn items(&self) -> &[(K, u64, V)] {
@@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
impl<K, V> CRDT for LWWMap<K, V>
-where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
- V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
+where
+ K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
- match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
- let (_, ts1, v1) = &self.vals[i];
+ let (_, ts1, _v1) = &self.vals[i];
if ts2 > ts1 {
self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone();
diff --git a/src/table/lib.rs b/src/table/lib.rs
index e2bf1f46..704f8f1e 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -3,9 +3,9 @@
#[macro_use]
extern crate log;
+pub mod crdt;
pub mod schema;
pub mod util;
-pub mod crdt;
pub mod table;
pub mod table_fullcopy;
@@ -13,5 +13,5 @@ pub mod table_sharded;
pub mod table_sync;
pub use schema::*;
-pub use util::*;
pub use table::*;
+pub use util::*;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 49cede0a..d2ec9450 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -20,7 +20,6 @@ impl PartitionKey for Hash {
}
}
-
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
@@ -37,7 +36,6 @@ impl SortKey for Hash {
}
}
-
pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
@@ -47,7 +45,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-
#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync {
true
}
}
-