aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/block_ref_table.rs17
-rw-r--r--src/model/bucket_table.rs2
-rw-r--r--src/model/garage.rs1
-rw-r--r--src/model/key_table.rs13
-rw-r--r--src/model/object_table.rs39
-rw-r--r--src/model/version_table.rs96
7 files changed, 77 insertions, 93 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 56c85c6a..d3957403 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -420,7 +420,7 @@ impl BlockManager {
if Some(&block_ref.block) == last_hash.as_ref() {
continue;
}
- if !block_ref.deleted {
+ if !block_ref.deleted.get() {
last_hash = Some(block_ref.block);
self.put_to_resync(&block_ref.block, 0)?;
}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index 9ab67737..07fa5144 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -1,9 +1,9 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use garage_util::background::*;
use garage_util::data::*;
+use garage_table::crdt::CRDT;
use garage_table::*;
use crate::block::*;
@@ -17,7 +17,7 @@ pub struct BlockRef {
pub version: UUID,
// Keep track of deleted status
- pub deleted: bool,
+ pub deleted: crdt::Bool,
}
impl Entry<Hash, UUID> for BlockRef {
@@ -27,16 +27,15 @@ impl Entry<Hash, UUID> for BlockRef {
fn sort_key(&self) -> &UUID {
&self.version
}
+}
+impl CRDT for BlockRef {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
- }
+ self.deleted.merge(&other.deleted);
}
}
pub struct BlockRefTable {
- pub background: Arc<BackgroundRunner>,
pub block_manager: Arc<BlockManager>,
}
@@ -48,8 +47,8 @@ impl TableSchema for BlockRefTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
- let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
- let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
+ let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e);
@@ -63,6 +62,6 @@ impl TableSchema for BlockRefTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 2878aa38..5bc8b7f9 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -89,7 +89,9 @@ impl Entry<EmptyKey, String> for Bucket {
fn sort_key(&self) -> &String {
&self.name
}
+}
+impl CRDT for Bucket {
fn merge(&mut self, other: &Self) {
self.state.merge(&other.state);
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 467d0aec..d109fdaa 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -79,7 +79,6 @@ impl Garage {
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
- background: background.clone(),
block_manager: block_manager.clone(),
},
data_rep_param.clone(),
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 5942df75..6d8cc6c0 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::CRDT;
+use garage_table::crdt::*;
use garage_table::*;
use model010::key_table as prev;
@@ -66,6 +66,10 @@ pub struct PermissionSet {
pub allow_write: bool,
}
+impl AutoCRDT for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl Entry<EmptyKey, String> for Key {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -73,17 +77,18 @@ impl Entry<EmptyKey, String> for Key {
fn sort_key(&self) -> &String {
&self.key_id
}
+}
+impl CRDT for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.authorized_buckets.clear();
- return;
+ } else {
+ self.authorized_buckets.merge(&other.authorized_buckets);
}
-
- self.authorized_buckets.merge(&other.authorized_buckets);
}
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 16cce72c..75c37f6d 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -5,6 +5,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -70,7 +71,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl ObjectVersionState {
+impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -91,37 +92,30 @@ impl ObjectVersionState {
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash),
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+impl AutoCRDT for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders,
pub size: u64,
pub etag: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
pub content_type: String,
pub other: BTreeMap<String, String>,
}
-impl ObjectVersionData {
- fn merge(&mut self, b: &Self) {
- if *self != *b {
- warn!(
- "Inconsistent object version data: {:?} (local) vs {:?} (remote)",
- self, b
- );
- }
- }
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
@@ -154,8 +148,11 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String {
&self.key
}
+}
+impl CRDT for Object {
fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
@@ -169,6 +166,9 @@ impl Entry<String, String> for Object {
}
}
}
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
let last_complete = self
.versions
.iter()
@@ -212,13 +212,8 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version = Version::new(
- v.uuid,
- old_v.bucket.clone(),
- old_v.key.clone(),
- true,
- vec![],
- );
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index cf9fbe98..26abb64e 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
+use garage_table::crdt::*;
use garage_table::table_sharded::*;
use garage_table::*;
@@ -15,8 +16,8 @@ pub struct Version {
pub uuid: UUID,
// Actual data: the blocks for this version
- pub deleted: bool,
- blocks: Vec<VersionBlock>,
+ pub deleted: crdt::Bool,
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
@@ -25,56 +26,45 @@ pub struct Version {
}
impl Version {
- pub fn new(
- uuid: UUID,
- bucket: String,
- key: String,
- deleted: bool,
- blocks: Vec<VersionBlock>,
- ) -> Self {
- let mut ret = Self {
+ pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
+ Self {
uuid,
- deleted,
- blocks: vec![],
+ deleted: deleted.into(),
+ blocks: crdt::Map::new(),
bucket,
key,
- };
- for b in blocks {
- ret.add_block(b)
- .expect("Twice the same VersionBlock in Version constructor");
}
- ret
}
- /// Adds a block if it wasn't already present
- pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
- match self
- .blocks
- .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key()))
- {
- Err(i) => {
- self.blocks.insert(i, new);
- Ok(())
- }
- Ok(_) => Err(()),
- }
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ pub part_number: u64,
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
}
- pub fn blocks(&self) -> &[VersionBlock] {
- &self.blocks[..]
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
}
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
- pub part_number: u64,
- pub offset: u64,
pub hash: Hash,
pub size: u64,
}
-impl VersionBlock {
- fn cmp_key(&self) -> (u64, u64) {
- (self.part_number, self.offset)
- }
+impl AutoCRDT for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
}
impl Entry<Hash, EmptyKey> for Version {
@@ -84,23 +74,16 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
+}
+impl CRDT for Version {
fn merge(&mut self, other: &Self) {
- if other.deleted {
- self.deleted = true;
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
self.blocks.clear();
- } else if !self.deleted {
- for bi in other.blocks.iter() {
- match self
- .blocks
- .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key()))
- {
- Ok(_) => (),
- Err(pos) => {
- self.blocks.insert(pos, bi.clone());
- }
- }
- }
+ } else {
+ self.blocks.merge(&other.blocks);
}
}
}
@@ -121,14 +104,15 @@ impl TableSchema for VersionTable {
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- if new_v.deleted && !old_v.deleted {
+ if new_v.deleted.get() && !old_v.deleted.get() {
let deleted_block_refs = old_v
.blocks
+ .items()
.iter()
- .map(|vb| BlockRef {
+ .map(|(_k, vb)| BlockRef {
block: vb.hash,
version: old_v.uuid,
- deleted: true,
+ deleted: true.into(),
})
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
@@ -139,6 +123,6 @@ impl TableSchema for VersionTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- filter.apply(entry.deleted)
+ filter.apply(entry.deleted.get())
}
}