aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-03 14:44:47 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-03 14:44:47 +0100
commitcdb2a591e9d393d24ab5c49bb905b0589b193299 (patch)
tree10c95206d0bd7b30c1fcd14ccc188be374cb1066
parent582b0761790b7958a3ba10c4b549b466997d2dcd (diff)
downloadgarage-cdb2a591e9d393d24ab5c49bb905b0589b193299.tar.gz
garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.zip
Refactor how things are migrated
-rw-r--r--Cargo.lock1
-rw-r--r--src/block/repair.rs1
-rw-r--r--src/block/resync.rs1
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/model/bucket_alias_table.rs20
-rw-r--r--src/model/bucket_table.rs128
-rw-r--r--src/model/index_counter.rs50
-rw-r--r--src/model/k2v/item_table.rs54
-rw-r--r--src/model/key_table.rs154
-rw-r--r--src/model/prev/v051/bucket_table.rs2
-rw-r--r--src/model/prev/v051/key_table.rs50
-rw-r--r--src/model/prev/v051/mod.rs3
-rw-r--r--src/model/prev/v051/object_table.rs149
-rw-r--r--src/model/prev/v051/version_table.rs79
-rw-r--r--src/model/s3/block_ref_table.rs29
-rw-r--r--src/model/s3/object_table.rs241
-rw-r--r--src/model/s3/version_table.rs180
-rw-r--r--src/rpc/layout.rs2
-rw-r--r--src/rpc/system.rs18
-rw-r--r--src/table/data.rs30
-rw-r--r--src/table/schema.rs23
-rw-r--r--src/table/sync.rs6
-rw-r--r--src/table/table.rs7
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/migrate.rs75
-rw-r--r--src/util/persister.rs38
27 files changed, 637 insertions, 708 deletions
diff --git a/Cargo.lock b/Cargo.lock
index b1bbf35e..2a89fa1a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1277,6 +1277,7 @@ dependencies = [
"garage_db",
"git-version",
"hex",
+ "hexdump",
"http",
"hyper",
"lazy_static",
diff --git a/src/block/repair.rs b/src/block/repair.rs
index f5515d4e..a6ded65a 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -178,6 +178,7 @@ struct ScrubWorkerPersisted {
time_last_complete_scrub: u64,
corruptions_detected: u64,
}
+impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState {
Running(BlockStoreIterator),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 51bb9846..9c7b3b0e 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -63,6 +63,7 @@ struct ResyncPersistedConfig {
n_workers: usize,
tranquility: u32,
}
+impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
enum ResyncIterResult {
BusyDidSomething,
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index cee7060e..337699ca 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -74,7 +74,7 @@ base64 = "0.13"
[features]
-default = [ "bundled-libs", "metrics", "sled" ]
+default = [ "bundled-libs", "metrics", "sled", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
index fcd1536e..d07394f6 100644
--- a/src/model/bucket_alias_table.rs
+++ b/src/model/bucket_alias_table.rs
@@ -5,14 +5,22 @@ use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::*;
-/// The bucket alias table holds the names given to buckets
-/// in the global namespace.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketAlias {
- name: String,
- pub state: crdt::Lww<Option<Uuid>>,
+mod v08 {
+ use super::*;
+
+ /// The bucket alias table holds the names given to buckets
+ /// in the global namespace.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketAlias {
+ pub(super) name: String,
+ pub state: crdt::Lww<Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::InitialFormat for BucketAlias {}
}
+pub use v08::*;
+
impl BucketAlias {
pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> {
if !is_valid_bucket_name(&name) {
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7be42702..38ed88ee 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -7,72 +7,80 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm;
-/// A bucket is a collection of objects
-///
-/// Its parameters are not directly accessible as:
-/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
-/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Bucket {
- /// ID of the bucket
- pub id: Uuid,
- /// State, and configuration if not deleted, of the bucket
- pub state: crdt::Deletable<BucketParams>,
-}
+mod v08 {
+ use super::*;
+
+ /// A bucket is a collection of objects
+ ///
+ /// Its parameters are not directly accessible as:
+ /// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
+ /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Bucket {
+ /// ID of the bucket
+ pub id: Uuid,
+ /// State, and configuration if not deleted, of the bucket
+ pub state: crdt::Deletable<BucketParams>,
+ }
-/// Configuration for a bucket
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketParams {
- /// Bucket's creation date
- pub creation_date: u64,
- /// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
-
- /// Map of aliases that are or have been given to this bucket
- /// in the global namespace
- /// (not authoritative: this is just used as an indication to
- /// map back to aliases when doing ListBuckets)
- pub aliases: crdt::LwwMap<String, bool>,
- /// Map of aliases that are or have been given to this bucket
- /// in namespaces local to keys
- /// key = (access key id, alias name)
- pub local_aliases: crdt::LwwMap<(String, String), bool>,
-
- /// Whether this bucket is allowed for website access
- /// (under all of its global alias names),
- /// and if so, the website configuration XML document
- pub website_config: crdt::Lww<Option<WebsiteConfig>>,
- /// CORS rules
- pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
- /// Bucket quotas
- #[serde(default)]
- pub quotas: crdt::Lww<BucketQuotas>,
-}
+ /// Configuration for a bucket
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketParams {
+ /// Bucket's creation date
+ pub creation_date: u64,
+ /// Map of key with access to the bucket, and what kind of access they give
+ pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
+
+ /// Map of aliases that are or have been given to this bucket
+ /// in the global namespace
+ /// (not authoritative: this is just used as an indication to
+ /// map back to aliases when doing ListBuckets)
+ pub aliases: crdt::LwwMap<String, bool>,
+ /// Map of aliases that are or have been given to this bucket
+ /// in namespaces local to keys
+ /// key = (access key id, alias name)
+ pub local_aliases: crdt::LwwMap<(String, String), bool>,
+
+ /// Whether this bucket is allowed for website access
+ /// (under all of its global alias names),
+ /// and if so, the website configuration XML document
+ pub website_config: crdt::Lww<Option<WebsiteConfig>>,
+ /// CORS rules
+ pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct WebsiteConfig {
- pub index_document: String,
- pub error_document: Option<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct WebsiteConfig {
+ pub index_document: String,
+ pub error_document: Option<String>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CorsRule {
- pub id: Option<String>,
- pub max_age_seconds: Option<u64>,
- pub allow_origins: Vec<String>,
- pub allow_methods: Vec<String>,
- pub allow_headers: Vec<String>,
- pub expose_headers: Vec<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CorsRule {
+ pub id: Option<String>,
+ pub max_age_seconds: Option<u64>,
+ pub allow_origins: Vec<String>,
+ pub allow_methods: Vec<String>,
+ pub allow_headers: Vec<String>,
+ pub expose_headers: Vec<String>,
+ }
-#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketQuotas {
- /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
- pub max_size: Option<u64>,
- /// Maximum number of non-deleted objects in the bucket
- pub max_objects: Option<u64>,
+ #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+ }
+
+ impl garage_util::migrate::InitialFormat for Bucket {}
}
+pub use v08::*;
+
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 6303ea3e..c3ed29c7 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -12,6 +12,7 @@ use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_util::time::*;
use garage_table::crdt::*;
@@ -29,14 +30,28 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
fn counts(&self) -> Vec<(&'static str, i64)>;
}
-/// A counter entry in the global table
-#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CountedItem> {
- pub pk: T::CP,
- pub sk: T::CS,
- pub values: BTreeMap<String, CounterValue>,
+mod v08 {
+ use super::*;
+
+ /// A counter entry in the global table
+ #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+ pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
+ pub values: BTreeMap<String, CounterValue>,
+ }
+
+ /// A counter entry in the global table
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
}
+pub use v08::*;
+
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP {
&self.pk
@@ -78,12 +93,6 @@ impl<T: CountedItem> CounterEntry<T> {
}
}
-/// A counter entry in the global table
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterValue {
- pub node_values: BTreeMap<Uuid, (u64, i64)>,
-}
-
impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
@@ -195,11 +204,9 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?
- }
+ Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")
+ .map_err(db::TxError::Abort)?,
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
@@ -214,7 +221,8 @@ impl<T: CountedItem> IndexCounter<T> {
ent.1 += *inc;
}
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ let new_entry_bytes = entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
@@ -263,7 +271,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 = 0;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?;
@@ -330,7 +338,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 += v;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_key, local_counter_bytes)?;
@@ -357,6 +365,8 @@ struct LocalCounterEntry<T: CountedItem> {
values: BTreeMap<String, (u64, i64)>,
}
+impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
+
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 7860cb17..ce3e4129 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -1,7 +1,8 @@
-use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use serde::{Deserialize, Serialize};
+
use garage_db as db;
use garage_util::data::*;
@@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct K2VItem {
- pub partition: K2VItemPartition,
- pub sort_key: String,
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
- items: BTreeMap<K2VNodeId, DvvsEntry>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
-pub struct K2VItemPartition {
- pub bucket_id: Uuid,
- pub partition_key: String,
-}
+ pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-struct DvvsEntry {
- t_discard: u64,
- values: Vec<(u64, DvvsValue)>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
+ pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct DvvsEntry {
+ pub(super) t_discard: u64,
+ pub(super) values: Vec<(u64, DvvsValue)>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum DvvsValue {
- Value(#[serde(with = "serde_bytes")] Vec<u8>),
- Deleted,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VItem {}
}
+pub use v08::*;
+
impl K2VItem {
/// Creates a new K2VItem when no previous entry existed in the db
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 9d2fc783..bb5334a3 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,45 +1,121 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::*;
-use garage_table::*;
+use garage_util::crdt::{self, Crdt};
use garage_util::data::*;
+use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
+
use crate::permission::BucketKeyPerm;
-use crate::prev::v051::key_table as old;
+pub(crate) mod v05 {
+ use garage_util::crdt;
+ use serde::{Deserialize, Serialize};
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
- /// Internal state of the key
- pub state: crdt::Deletable<KeyParams>,
-}
+ /// The secret_key associated
+ pub secret_key: String,
-/// Configuration for a key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct KeyParams {
- /// The secret_key associated (immutable)
- pub secret_key: String,
+ /// Name for the key
+ pub name: crdt::Lww<String>,
- /// Name for the key
- pub name: crdt::Lww<String>,
+ /// Is the key deleted
+ pub deleted: crdt::Bool,
+
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
+ // CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+ }
- /// Flag to allow users having this key to create buckets
- pub allow_create_bucket: crdt::Lww<bool>,
+ /// Permission given to a key in a bucket
+ #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct PermissionSet {
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+ }
+
+ impl crdt::AutoCrdt for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+ }
- /// If the key is present: it gives some permissions,
- /// a map of bucket IDs (uuids) to permissions.
- /// Otherwise no permissions are granted to key
- pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+ impl garage_util::migrate::InitialFormat for Key {}
+}
- /// A key can have a local view of buckets names it is
- /// the only one to see, this is the namespace for these aliases
- pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+mod v08 {
+ use super::v05;
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
+
+ /// Internal state of the key
+ pub state: crdt::Deletable<KeyParams>,
+ }
+
+ /// Configuration for a key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct KeyParams {
+ /// The secret_key associated (immutable)
+ pub secret_key: String,
+
+ /// Name for the key
+ pub name: crdt::Lww<String>,
+
+ /// Flag to allow users having this key to create buckets
+ pub allow_create_bucket: crdt::Lww<bool>,
+
+ /// If the key is present: it gives some permissions,
+ /// a map of bucket IDs (uuids) to permissions.
+ /// Otherwise no permissions are granted to key
+ pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+
+ /// A key can have a local view of buckets names it is
+ /// the only one to see, this is the namespace for these aliases
+ pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::Migrate for Key {
+ type Previous = v05::Key;
+
+ fn migrate(old_k: v05::Key) -> Key {
+ let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
+
+ let state = if old_k.deleted.get() {
+ crdt::Deletable::Deleted
+ } else {
+ // Authorized buckets is ignored here,
+ // migration is performed in specific migration code in
+ // garage/migrate.rs
+ crdt::Deletable::Present(KeyParams {
+ secret_key: old_k.secret_key,
+ name,
+ allow_create_bucket: crdt::Lww::new(false),
+ authorized_buckets: crdt::Map::new(),
+ local_aliases: crdt::LwwMap::new(),
+ })
+ };
+ Key {
+ key_id: old_k.key_id,
+ state,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl KeyParams {
fn new(secret_key: &str, name: &str) -> Self {
KeyParams {
@@ -173,28 +249,4 @@ impl TableSchema for KeyTable {
}
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
- let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
-
- let state = if old_k.deleted.get() {
- crdt::Deletable::Deleted
- } else {
- // Authorized buckets is ignored here,
- // migration is performed in specific migration code in
- // garage/migrate.rs
- crdt::Deletable::Present(KeyParams {
- secret_key: old_k.secret_key,
- name,
- allow_create_bucket: crdt::Lww::new(false),
- authorized_buckets: crdt::Map::new(),
- local_aliases: crdt::LwwMap::new(),
- })
- };
- Some(Key {
- key_id: old_k.key_id,
- state,
- })
- }
}
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
index 628a49dd..19893458 100644
--- a/src/model/prev/v051/bucket_table.rs
+++ b/src/model/prev/v051/bucket_table.rs
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::*;
-use super::key_table::PermissionSet;
+use crate::key_table::v05::PermissionSet;
/// A bucket is a collection of objects
///
diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs
deleted file mode 100644
index 37516b1c..00000000
--- a/src/model/prev/v051/key_table.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
-
- /// The secret_key associated
- pub secret_key: String,
-
- /// Name for the key
- pub name: crdt::Lww<String>,
-
- /// Is the key deleted
- pub deleted: crdt::Bool,
-
- /// Buckets in which the key is authorized. Empty if `Key` is deleted
- // CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
-}
-
-/// Permission given to a key in a bucket
-#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct PermissionSet {
- /// The key can be used to read the bucket
- pub allow_read: bool,
- /// The key can be used to write in the bucket
- pub allow_write: bool,
-}
-
-impl AutoCrdt for PermissionSet {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Key {
- fn merge(&mut self, other: &Self) {
- self.name.merge(&other.name);
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.authorized_buckets.clear();
- } else {
- self.authorized_buckets.merge(&other.authorized_buckets);
- }
- }
-}
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
index 7a954752..8c1335a5 100644
--- a/src/model/prev/v051/mod.rs
+++ b/src/model/prev/v051/mod.rs
@@ -1,4 +1 @@
pub(crate) mod bucket_table;
-pub(crate) mod key_table;
-pub(crate) mod object_table;
-pub(crate) mod version_table;
diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs
deleted file mode 100644
index e79e5787..00000000
--- a/src/model/prev/v051/object_table.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket: String,
-
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
-
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
-}
-
-impl Object {
- /// Get a list of currently stored versions of `Object`
- pub fn versions(&self) -> &[ObjectVersion] {
- &self.versions[..]
- }
-}
-
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
-impl Crdt for ObjectVersionState {
- fn merge(&mut self, other: &Self) {
- use ObjectVersionState::*;
- match other {
- Aborted => {
- *self = Aborted;
- }
- Complete(b) => match self {
- Aborted => {}
- Complete(a) => {
- a.merge(b);
- }
- Uploading(_) => {
- *self = Complete(b.clone());
- }
- },
- Uploading(_) => {}
- }
- }
-}
-
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
-impl AutoCrdt for ObjectVersionData {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
-impl ObjectVersion {
- fn cmp_key(&self) -> (u64, Uuid) {
- (self.timestamp, self.uuid)
- }
-
- /// Is the object version completely received
- pub fn is_complete(&self) -> bool {
- matches!(self.state, ObjectVersionState::Complete(_))
- }
-}
-
-impl Crdt for Object {
- fn merge(&mut self, other: &Self) {
- // Merge versions from other into here
- for other_v in other.versions.iter() {
- match self
- .versions
- .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
- {
- Ok(i) => {
- self.versions[i].state.merge(&other_v.state);
- }
- Err(i) => {
- self.versions.insert(i, other_v.clone());
- }
- }
- }
-
- // Remove versions which are obsolete, i.e. those that come
- // before the last version which .is_complete().
- let last_complete = self
- .versions
- .iter()
- .enumerate()
- .rev()
- .find(|(_, v)| v.is_complete())
- .map(|(vi, _)| vi);
-
- if let Some(last_vi) = last_complete {
- self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
- }
- }
-}
diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs
deleted file mode 100644
index c11c62d5..00000000
--- a/src/model/prev/v051/version_table.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket: String,
- /// Key in which the related object is stored
- pub key: String,
-}
-
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
-impl Ord for VersionBlockKey {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.part_number
- .cmp(&other.part_number)
- .then(self.offset.cmp(&other.offset))
- }
-}
-
-impl PartialOrd for VersionBlockKey {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
-impl AutoCrdt for VersionBlock {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Version {
- fn merge(&mut self, other: &Self) {
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.blocks.clear();
- self.parts_etags.clear();
- } else {
- self.blocks.merge(&other.blocks);
- self.parts_etags.merge(&other.parts_etags);
- }
- }
-}
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index c7017409..7b023d87 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
@@ -10,19 +9,29 @@ use garage_table::*;
use garage_block::manager::*;
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BlockRef {
- /// Hash (blake2 sum) of the block, used as partition key
- pub block: Hash,
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
- /// Id of the Version for the object containing this block, used as sorting key
- pub version: Uuid,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BlockRef {
+ /// Hash (blake2 sum) of the block, used as partition key
+ pub block: Hash,
- // Keep track of deleted status
- /// Is the Version that contains this block deleted
- pub deleted: crdt::Bool,
+ /// Id of the Version for the object containing this block, used as sorting key
+ pub version: Uuid,
+
+ // Keep track of deleted status
+ /// Is the Version that contains this block deleted
+ pub deleted: crdt::Bool,
+ }
+
+ impl garage_util::migrate::InitialFormat for BlockRef {}
}
+pub use v08::*;
+
impl Entry<Hash, Uuid> for BlockRef {
fn partition_key(&self) -> &Hash {
&self.block
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 1b2f0014..616e0d35 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
use std::sync::Arc;
use garage_db as db;
@@ -13,25 +12,126 @@ use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
-use crate::prev::v051::object_table as old;
-
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket_id: Uuid,
+mod v05 {
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket: String,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading(ObjectVersionHeaders),
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ /// Data stored in object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+ }
+
+ /// Metadata about the object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionMeta {
+ /// Headers to send to the client
+ pub headers: ObjectVersionHeaders,
+ /// Size of the object
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+ }
+
+ /// Additional headers for an object
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionHeaders {
+ /// Content type of the object
+ pub content_type: String,
+ /// Any other http headers to send
+ pub other: BTreeMap<String, String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for Object {}
+}
+
+mod v08 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
+ pub use v05::{
+ ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
+ ObjectVersionState,
+ };
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
+ impl garage_util::migrate::Migrate for Object {
+ type Previous = v05::Object;
+
+ fn migrate(old: v05::Object) -> Object {
+ use garage_util::data::blake2sum;
+
+ Object {
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ versions: old.versions,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl Object {
/// Initialize an Object struct from parts
pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self {
@@ -68,28 +168,6 @@ impl Object {
}
}
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
@@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState {
}
}
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid)
@@ -290,11 +336,6 @@ impl TableSchema for ObjectTable {
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
- Some(migrate_object(old_obj))
- }
}
impl CountedItem for Object {
@@ -342,61 +383,3 @@ impl CountedItem for Object {
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)
-
-fn migrate_object(o: old::Object) -> Object {
- let versions = o
- .versions()
- .iter()
- .cloned()
- .map(migrate_object_version)
- .collect();
- Object {
- bucket_id: blake2sum(o.bucket.as_bytes()),
- key: o.key,
- versions,
- }
-}
-
-fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion {
- ObjectVersion {
- uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(),
- timestamp: v.timestamp,
- state: match v.state {
- old::ObjectVersionState::Uploading(h) => {
- ObjectVersionState::Uploading(migrate_object_version_headers(h))
- }
- old::ObjectVersionState::Complete(d) => {
- ObjectVersionState::Complete(migrate_object_version_data(d))
- }
- old::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- },
- }
-}
-
-fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders {
- ObjectVersionHeaders {
- content_type: h.content_type,
- other: h.other,
- }
-}
-
-fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData {
- match d {
- old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
- old::ObjectVersionData::Inline(m, b) => {
- ObjectVersionData::Inline(migrate_object_version_meta(m), b)
- }
- old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock(
- migrate_object_version_meta(m),
- Hash::try_from(h.as_slice()).unwrap(),
- ),
- }
-}
-
-fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta {
- ObjectVersionMeta {
- headers: migrate_object_version_headers(m.headers),
- size: m.size,
- etag: m.etag,
- }
-}
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 0486512b..6edc83f4 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
@@ -11,32 +10,108 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-use crate::prev::v051::version_table as old;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket_id: Uuid,
- /// Key in which the related object is stored
- pub key: String,
+mod v05 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+ }
+
+ /// Informations about a single block
+ #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for Version {}
+}
+
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ pub use v05::{VersionBlock, VersionBlockKey};
+
+ impl garage_util::migrate::Migrate for Version {
+ type Previous = v05::Version;
+
+ fn migrate(old: v05::Version) -> Version {
+ use garage_util::data::blake2sum;
+
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ parts_etags: old.parts_etags,
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
@@ -64,14 +139,6 @@ impl Version {
}
}
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
@@ -86,15 +153,6 @@ impl PartialOrd for VersionBlockKey {
}
}
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -166,42 +224,4 @@ impl TableSchema for VersionTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get())
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
-
- let blocks = old
- .blocks
- .items()
- .iter()
- .map(|(k, v)| {
- (
- VersionBlockKey {
- part_number: k.part_number,
- offset: k.offset,
- },
- VersionBlock {
- hash: Hash::try_from(v.hash.as_slice()).unwrap(),
- size: v.size,
- },
- )
- })
- .collect::<crdt::Map<_, _>>();
-
- let parts_etags = old
- .parts_etags
- .items()
- .iter()
- .map(|(k, v)| (*k, v.clone()))
- .collect::<crdt::Map<_, _>>();
-
- Some(Version {
- uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
- deleted: crdt::Bool::new(old.deleted.get()),
- blocks,
- parts_etags,
- bucket_id: blake2sum(old.bucket.as_bytes()),
- key: old.key,
- })
- }
}
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 2fd5acfc..de4117a6 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -35,6 +35,8 @@ pub struct ClusterLayout {
pub staging_hash: Hash,
}
+impl garage_util::migrate::InitialFormat for ClusterLayout {}
+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 8f753b7f..22e23e55 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -73,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -721,7 +725,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -801,12 +805,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
diff --git a/src/table/data.rs b/src/table/data.rs
index 40856b02..f93ed00d 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
@@ -219,7 +220,8 @@ where
// data format, the messagepack encoding changed. In this case,
// we also have to write the migrated value in the table and update
// the associated Merkle tree entry.
- let new_bytes = rmp_to_vec_all_named(&new_entry)
+ let new_bytes = new_entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
@@ -329,9 +331,9 @@ where
Some(old_v) => {
let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
entry.merge(ins);
- rmp_to_vec_all_named(&entry)
+ entry.encode()
}
- None => rmp_to_vec_all_named(ins),
+ None => ins.encode(),
};
let new_entry = new_entry
.map_err(Error::RmpEncode)
@@ -351,18 +353,18 @@ where
}
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
+ match F::E::decode(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ error!("Unable to decode entry of {}", F::TABLE_NAME);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
}
- },
+ Err(Error::Message(format!(
+ "Unable to decode entry of {}",
+ F::TABLE_NAME
+ )))
+ }
}
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index f37e98d8..6538a32f 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_util::data::*;
+use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
@@ -46,7 +47,7 @@ impl SortKey for FixedBytes32 {
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
- Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
@@ -65,23 +66,23 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str;
/// The partition key used in that table
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type P: PartitionKey
+ + Clone
+ + PartialEq
+ + Serialize
+ + for<'de> Deserialize<'de>
+ + Send
+ + Sync
+ + 'static;
/// The sort key used int that table
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
/// The type for a filter that can be applied to select entries
/// (e.g. filter out deleted entries)
- type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
-
- // Action to take if not able to decode current version:
- // try loading from an older version
- /// Try migrating an entry from an older version
- fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
- None
- }
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// Actions triggered by data changing in a table. If such actions
/// include updates to the local database that should be applied
diff --git a/src/table/sync.rs b/src/table/sync.rs
index d6d272ab..abc034f8 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -302,7 +302,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -468,7 +468,7 @@ where
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -622,7 +622,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
diff --git a/src/table/table.rs b/src/table/table.rs
index bbcd5971..7f158314 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -122,7 +123,7 @@ where
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
@@ -173,7 +174,7 @@ where
let entry = entry.borrow();
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());
}
@@ -412,7 +413,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
+ let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
.rpc
.try_call_many(
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 11640027..32e9c851 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..fd3d5c7b 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,6 +11,7 @@ pub mod data;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..199c68f6
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,75 @@
+use serde::{Deserialize, Serialize};
+
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype
+ /// is the initial schema and cannot be migrated.
+ type Previous: Migrate;
+
+ /// This function must be filled in by implementors to migrate from a previons iteration
+ /// of the data format.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER {
+ if let Ok(value) =
+ rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..])
+ {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const MARKER: &'static [u8] = b"";
+}
+
+// ----
+
+impl<T: InitialFormat> Migrate for T {
+ const MARKER: &'static [u8] = <T as InitialFormat>::MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct NoPrevious;
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..4b9adf51 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +22,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +71,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;