aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
committerAlex <alex@adnab.me>2023-01-03 15:28:24 +0000
commit73ed9c74039448c69ebe382e361acf3ecbfef70b (patch)
tree7fb21a559e53557d5dea5efd2b7dafe9f9751367 /src
parent582b0761790b7958a3ba10c4b549b466997d2dcd (diff)
parent1d5bdc17a46648eb3494ff629d0d360d0217c1e2 (diff)
downloadgarage-73ed9c74039448c69ebe382e361acf3ecbfef70b.tar.gz
garage-73ed9c74039448c69ebe382e361acf3ecbfef70b.zip
Merge pull request 'Refactor how things are migrated' (#461) from format-migration into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/461
Diffstat (limited to 'src')
-rw-r--r--src/api/router_macros.rs1
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/repair.rs1
-rw-r--r--src/block/resync.rs1
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/repair/online.rs5
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/bucket_alias_table.rs24
-rw-r--r--src/model/bucket_table.rs133
-rw-r--r--src/model/index_counter.rs80
-rw-r--r--src/model/k2v/item_table.rs54
-rw-r--r--src/model/key_table.rs154
-rw-r--r--src/model/migrate.rs5
-rw-r--r--src/model/prev/v051/bucket_table.rs2
-rw-r--r--src/model/prev/v051/key_table.rs50
-rw-r--r--src/model/prev/v051/mod.rs3
-rw-r--r--src/model/prev/v051/object_table.rs149
-rw-r--r--src/model/prev/v051/version_table.rs79
-rw-r--r--src/model/s3/block_ref_table.rs29
-rw-r--r--src/model/s3/object_table.rs244
-rw-r--r--src/model/s3/version_table.rs180
-rw-r--r--src/rpc/Cargo.toml1
-rw-r--r--src/rpc/layout.rs13
-rw-r--r--src/rpc/system.rs18
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/data.rs36
-rw-r--r--src/table/gc.rs32
-rw-r--r--src/table/merkle.rs24
-rw-r--r--src/table/queue.rs10
-rw-r--r--src/table/replication/parameters.rs2
-rw-r--r--src/table/schema.rs22
-rw-r--r--src/table/sync.rs29
-rw-r--r--src/table/table.rs21
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/data.rs31
-rw-r--r--src/util/encode.rs42
-rw-r--r--src/util/error.rs1
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/migrate.rs159
-rw-r--r--src/util/persister.rs38
40 files changed, 826 insertions, 856 deletions
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
index 959e69a3..07b5570c 100644
--- a/src/api/router_macros.rs
+++ b/src/api/router_macros.rs
@@ -145,6 +145,7 @@ macro_rules! generateQueryParameters {
) => {
#[derive(Debug)]
#[allow(non_camel_case_types)]
+ #[allow(clippy::upper_case_acronyms)]
enum Keyword {
EMPTY,
$( $kw_name, )*
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index cbd58d32..1e4eb64e 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -31,7 +31,6 @@ rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
zstd = { version = "0.9", default-features = false }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/repair.rs b/src/block/repair.rs
index f5515d4e..a6ded65a 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -178,6 +178,7 @@ struct ScrubWorkerPersisted {
time_last_complete_scrub: u64,
corruptions_detected: u64,
}
+impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState {
Running(BlockStoreIterator),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 51bb9846..9c7b3b0e 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -63,6 +63,7 @@ struct ResyncPersistedConfig {
n_workers: usize,
tranquility: u32,
}
+impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
enum ResyncIterResult {
BusyDidSomething,
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index cee7060e..b43b0242 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -42,7 +42,6 @@ rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
@@ -74,7 +73,7 @@ base64 = "0.13"
[features]
-default = [ "bundled-libs", "metrics", "sled" ]
+default = [ "bundled-libs", "metrics", "sled", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 7120972c..627e3bf3 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -12,6 +12,7 @@ use garage_model::s3::version_table::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::error::Error;
+use garage_util::migrate::Migrate;
use crate::*;
@@ -100,7 +101,7 @@ impl Worker for RepairVersionsWorker {
}
};
- let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
if !version.deleted.get() {
let object = self
.garage
@@ -180,7 +181,7 @@ impl Worker for RepairBlockrefsWorker {
}
};
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
if !block_ref.deleted.get() {
let version = self
.garage
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 3d3fb693..323c2d64 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -30,7 +30,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
index fcd1536e..54d7fbad 100644
--- a/src/model/bucket_alias_table.rs
+++ b/src/model/bucket_alias_table.rs
@@ -1,18 +1,26 @@
-use serde::{Deserialize, Serialize};
-
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::*;
-/// The bucket alias table holds the names given to buckets
-/// in the global namespace.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketAlias {
- name: String,
- pub state: crdt::Lww<Option<Uuid>>,
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// The bucket alias table holds the names given to buckets
+ /// in the global namespace.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketAlias {
+ pub(super) name: String,
+ pub state: crdt::Lww<Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::InitialFormat for BucketAlias {}
}
+pub use v08::*;
+
impl BucketAlias {
pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> {
if !is_valid_bucket_name(&name) {
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7be42702..ac163736 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,5 +1,3 @@
-use serde::{Deserialize, Serialize};
-
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
@@ -7,72 +5,83 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm;
-/// A bucket is a collection of objects
-///
-/// Its parameters are not directly accessible as:
-/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
-/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Bucket {
- /// ID of the bucket
- pub id: Uuid,
- /// State, and configuration if not deleted, of the bucket
- pub state: crdt::Deletable<BucketParams>,
-}
+mod v08 {
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// A bucket is a collection of objects
+ ///
+ /// Its parameters are not directly accessible as:
+ /// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
+ /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Bucket {
+ /// ID of the bucket
+ pub id: Uuid,
+ /// State, and configuration if not deleted, of the bucket
+ pub state: crdt::Deletable<BucketParams>,
+ }
-/// Configuration for a bucket
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketParams {
- /// Bucket's creation date
- pub creation_date: u64,
- /// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
-
- /// Map of aliases that are or have been given to this bucket
- /// in the global namespace
- /// (not authoritative: this is just used as an indication to
- /// map back to aliases when doing ListBuckets)
- pub aliases: crdt::LwwMap<String, bool>,
- /// Map of aliases that are or have been given to this bucket
- /// in namespaces local to keys
- /// key = (access key id, alias name)
- pub local_aliases: crdt::LwwMap<(String, String), bool>,
-
- /// Whether this bucket is allowed for website access
- /// (under all of its global alias names),
- /// and if so, the website configuration XML document
- pub website_config: crdt::Lww<Option<WebsiteConfig>>,
- /// CORS rules
- pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
- /// Bucket quotas
- #[serde(default)]
- pub quotas: crdt::Lww<BucketQuotas>,
-}
+ /// Configuration for a bucket
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketParams {
+ /// Bucket's creation date
+ pub creation_date: u64,
+ /// Map of key with access to the bucket, and what kind of access they give
+ pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
+
+ /// Map of aliases that are or have been given to this bucket
+ /// in the global namespace
+ /// (not authoritative: this is just used as an indication to
+ /// map back to aliases when doing ListBuckets)
+ pub aliases: crdt::LwwMap<String, bool>,
+ /// Map of aliases that are or have been given to this bucket
+ /// in namespaces local to keys
+ /// key = (access key id, alias name)
+ pub local_aliases: crdt::LwwMap<(String, String), bool>,
+
+ /// Whether this bucket is allowed for website access
+ /// (under all of its global alias names),
+ /// and if so, the website configuration XML document
+ pub website_config: crdt::Lww<Option<WebsiteConfig>>,
+ /// CORS rules
+ pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct WebsiteConfig {
- pub index_document: String,
- pub error_document: Option<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct WebsiteConfig {
+ pub index_document: String,
+ pub error_document: Option<String>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CorsRule {
- pub id: Option<String>,
- pub max_age_seconds: Option<u64>,
- pub allow_origins: Vec<String>,
- pub allow_methods: Vec<String>,
- pub allow_headers: Vec<String>,
- pub expose_headers: Vec<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CorsRule {
+ pub id: Option<String>,
+ pub max_age_seconds: Option<u64>,
+ pub allow_origins: Vec<String>,
+ pub allow_methods: Vec<String>,
+ pub allow_headers: Vec<String>,
+ pub expose_headers: Vec<String>,
+ }
-#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketQuotas {
- /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
- pub max_size: Option<u64>,
- /// Maximum number of non-deleted objects in the bucket
- pub max_objects: Option<u64>,
+ #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+ }
+
+ impl garage_util::migrate::InitialFormat for Bucket {}
}
+pub use v08::*;
+
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 6303ea3e..35d6596d 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -12,6 +12,7 @@ use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_util::time::*;
use garage_table::crdt::*;
@@ -29,14 +30,44 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
fn counts(&self) -> Vec<(&'static str, i64)>;
}
-/// A counter entry in the global table
-#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CountedItem> {
- pub pk: T::CP,
- pub sk: T::CS,
- pub values: BTreeMap<String, CounterValue>,
+mod v08 {
+ use super::CountedItem;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ // ---- Global part (the table everyone queries) ----
+
+ /// A counter entry in the global table
+ #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+ pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
+ pub values: BTreeMap<String, CounterValue>,
+ }
+
+ /// A counter entry in the global table
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
+
+ // ---- Local part (the counter we maintain transactionnaly on each node) ----
+
+ #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+ pub(super) struct LocalCounterEntry<T: CountedItem> {
+ pub(super) pk: T::CP,
+ pub(super) sk: T::CS,
+ pub(super) values: BTreeMap<String, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
}
+pub use v08::*;
+
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP {
&self.pk
@@ -78,12 +109,6 @@ impl<T: CountedItem> CounterEntry<T> {
}
}
-/// A counter entry in the global table
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterValue {
- pub node_values: BTreeMap<Uuid, (u64, i64)>,
-}
-
impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
@@ -195,11 +220,9 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?
- }
+ Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")
+ .map_err(db::TxError::Abort)?,
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
@@ -214,7 +237,8 @@ impl<T: CountedItem> IndexCounter<T> {
ent.1 += *inc;
}
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ let new_entry_bytes = entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
@@ -255,15 +279,15 @@ impl<T: CountedItem> IndexCounter<T> {
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
for (local_counter_k, local_counter) in batch {
- let mut local_counter =
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+ let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter)
+ .ok_or_message("Cannot decode local counter entry")?;
for (_, tv) in local_counter.values.iter_mut() {
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 = 0;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?;
@@ -311,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> {
let local_counter_key = self.table.data.tree_key(pk, sk);
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
Some(old_bytes) => {
- let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
- &old_bytes,
- )?;
+ let ent = LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")?;
assert!(ent.pk == *pk);
assert!(ent.sk == *sk);
ent
@@ -330,7 +353,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 += v;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_key, local_counter_bytes)?;
@@ -350,13 +373,6 @@ impl<T: CountedItem> IndexCounter<T> {
// ----
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry<T: CountedItem> {
- pk: T::CP,
- sk: T::CS,
- values: BTreeMap<String, (u64, i64)>,
-}
-
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 7860cb17..ce3e4129 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -1,7 +1,8 @@
-use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use serde::{Deserialize, Serialize};
+
use garage_db as db;
use garage_util::data::*;
@@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct K2VItem {
- pub partition: K2VItemPartition,
- pub sort_key: String,
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
- items: BTreeMap<K2VNodeId, DvvsEntry>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
-pub struct K2VItemPartition {
- pub bucket_id: Uuid,
- pub partition_key: String,
-}
+ pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-struct DvvsEntry {
- t_discard: u64,
- values: Vec<(u64, DvvsValue)>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
+ pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct DvvsEntry {
+ pub(super) t_discard: u64,
+ pub(super) values: Vec<(u64, DvvsValue)>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum DvvsValue {
- Value(#[serde(with = "serde_bytes")] Vec<u8>),
- Deleted,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VItem {}
}
+pub use v08::*;
+
impl K2VItem {
/// Creates a new K2VItem when no previous entry existed in the db
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 9d2fc783..bb5334a3 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,45 +1,121 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::*;
-use garage_table::*;
+use garage_util::crdt::{self, Crdt};
use garage_util::data::*;
+use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
+
use crate::permission::BucketKeyPerm;
-use crate::prev::v051::key_table as old;
+pub(crate) mod v05 {
+ use garage_util::crdt;
+ use serde::{Deserialize, Serialize};
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
- /// Internal state of the key
- pub state: crdt::Deletable<KeyParams>,
-}
+ /// The secret_key associated
+ pub secret_key: String,
-/// Configuration for a key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct KeyParams {
- /// The secret_key associated (immutable)
- pub secret_key: String,
+ /// Name for the key
+ pub name: crdt::Lww<String>,
- /// Name for the key
- pub name: crdt::Lww<String>,
+ /// Is the key deleted
+ pub deleted: crdt::Bool,
+
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
+ // CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+ }
- /// Flag to allow users having this key to create buckets
- pub allow_create_bucket: crdt::Lww<bool>,
+ /// Permission given to a key in a bucket
+ #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct PermissionSet {
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+ }
+
+ impl crdt::AutoCrdt for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+ }
- /// If the key is present: it gives some permissions,
- /// a map of bucket IDs (uuids) to permissions.
- /// Otherwise no permissions are granted to key
- pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+ impl garage_util::migrate::InitialFormat for Key {}
+}
- /// A key can have a local view of buckets names it is
- /// the only one to see, this is the namespace for these aliases
- pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+mod v08 {
+ use super::v05;
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
+
+ /// Internal state of the key
+ pub state: crdt::Deletable<KeyParams>,
+ }
+
+ /// Configuration for a key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct KeyParams {
+ /// The secret_key associated (immutable)
+ pub secret_key: String,
+
+ /// Name for the key
+ pub name: crdt::Lww<String>,
+
+ /// Flag to allow users having this key to create buckets
+ pub allow_create_bucket: crdt::Lww<bool>,
+
+ /// If the key is present: it gives some permissions,
+ /// a map of bucket IDs (uuids) to permissions.
+ /// Otherwise no permissions are granted to key
+ pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+
+ /// A key can have a local view of buckets names it is
+ /// the only one to see, this is the namespace for these aliases
+ pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::Migrate for Key {
+ type Previous = v05::Key;
+
+ fn migrate(old_k: v05::Key) -> Key {
+ let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
+
+ let state = if old_k.deleted.get() {
+ crdt::Deletable::Deleted
+ } else {
+ // Authorized buckets is ignored here,
+ // migration is performed in specific migration code in
+ // garage/migrate.rs
+ crdt::Deletable::Present(KeyParams {
+ secret_key: old_k.secret_key,
+ name,
+ allow_create_bucket: crdt::Lww::new(false),
+ authorized_buckets: crdt::Map::new(),
+ local_aliases: crdt::LwwMap::new(),
+ })
+ };
+ Key {
+ key_id: old_k.key_id,
+ state,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl KeyParams {
fn new(secret_key: &str, name: &str) -> Self {
KeyParams {
@@ -173,28 +249,4 @@ impl TableSchema for KeyTable {
}
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
- let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
-
- let state = if old_k.deleted.get() {
- crdt::Deletable::Deleted
- } else {
- // Authorized buckets is ignored here,
- // migration is performed in specific migration code in
- // garage/migrate.rs
- crdt::Deletable::Present(KeyParams {
- secret_key: old_k.secret_key,
- name,
- allow_create_bucket: crdt::Lww::new(false),
- authorized_buckets: crdt::Map::new(),
- local_aliases: crdt::LwwMap::new(),
- })
- };
- Some(Key {
- key_id: old_k.key_id,
- state,
- })
- }
}
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index cd6ad26a..6b4c3eed 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -2,6 +2,7 @@ use std::sync::Arc;
use garage_util::crdt::*;
use garage_util::data::*;
+use garage_util::encode::nonversioned_decode;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -28,8 +29,8 @@ impl Migrate {
let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
- let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
- .map_err(GarageError::from)?;
+ let bucket =
+ nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?;
old_buckets.push(bucket);
}
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
index 628a49dd..19893458 100644
--- a/src/model/prev/v051/bucket_table.rs
+++ b/src/model/prev/v051/bucket_table.rs
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::*;
-use super::key_table::PermissionSet;
+use crate::key_table::v05::PermissionSet;
/// A bucket is a collection of objects
///
diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs
deleted file mode 100644
index 37516b1c..00000000
--- a/src/model/prev/v051/key_table.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
-
- /// The secret_key associated
- pub secret_key: String,
-
- /// Name for the key
- pub name: crdt::Lww<String>,
-
- /// Is the key deleted
- pub deleted: crdt::Bool,
-
- /// Buckets in which the key is authorized. Empty if `Key` is deleted
- // CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
-}
-
-/// Permission given to a key in a bucket
-#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct PermissionSet {
- /// The key can be used to read the bucket
- pub allow_read: bool,
- /// The key can be used to write in the bucket
- pub allow_write: bool,
-}
-
-impl AutoCrdt for PermissionSet {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Key {
- fn merge(&mut self, other: &Self) {
- self.name.merge(&other.name);
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.authorized_buckets.clear();
- } else {
- self.authorized_buckets.merge(&other.authorized_buckets);
- }
- }
-}
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
index 7a954752..8c1335a5 100644
--- a/src/model/prev/v051/mod.rs
+++ b/src/model/prev/v051/mod.rs
@@ -1,4 +1 @@
pub(crate) mod bucket_table;
-pub(crate) mod key_table;
-pub(crate) mod object_table;
-pub(crate) mod version_table;
diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs
deleted file mode 100644
index e79e5787..00000000
--- a/src/model/prev/v051/object_table.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket: String,
-
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
-
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
-}
-
-impl Object {
- /// Get a list of currently stored versions of `Object`
- pub fn versions(&self) -> &[ObjectVersion] {
- &self.versions[..]
- }
-}
-
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
-impl Crdt for ObjectVersionState {
- fn merge(&mut self, other: &Self) {
- use ObjectVersionState::*;
- match other {
- Aborted => {
- *self = Aborted;
- }
- Complete(b) => match self {
- Aborted => {}
- Complete(a) => {
- a.merge(b);
- }
- Uploading(_) => {
- *self = Complete(b.clone());
- }
- },
- Uploading(_) => {}
- }
- }
-}
-
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
-impl AutoCrdt for ObjectVersionData {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
-impl ObjectVersion {
- fn cmp_key(&self) -> (u64, Uuid) {
- (self.timestamp, self.uuid)
- }
-
- /// Is the object version completely received
- pub fn is_complete(&self) -> bool {
- matches!(self.state, ObjectVersionState::Complete(_))
- }
-}
-
-impl Crdt for Object {
- fn merge(&mut self, other: &Self) {
- // Merge versions from other into here
- for other_v in other.versions.iter() {
- match self
- .versions
- .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
- {
- Ok(i) => {
- self.versions[i].state.merge(&other_v.state);
- }
- Err(i) => {
- self.versions.insert(i, other_v.clone());
- }
- }
- }
-
- // Remove versions which are obsolete, i.e. those that come
- // before the last version which .is_complete().
- let last_complete = self
- .versions
- .iter()
- .enumerate()
- .rev()
- .find(|(_, v)| v.is_complete())
- .map(|(vi, _)| vi);
-
- if let Some(last_vi) = last_complete {
- self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
- }
- }
-}
diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs
deleted file mode 100644
index c11c62d5..00000000
--- a/src/model/prev/v051/version_table.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket: String,
- /// Key in which the related object is stored
- pub key: String,
-}
-
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
-impl Ord for VersionBlockKey {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.part_number
- .cmp(&other.part_number)
- .then(self.offset.cmp(&other.offset))
- }
-}
-
-impl PartialOrd for VersionBlockKey {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
-impl AutoCrdt for VersionBlock {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Version {
- fn merge(&mut self, other: &Self) {
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.blocks.clear();
- self.parts_etags.clear();
- } else {
- self.blocks.merge(&other.blocks);
- self.parts_etags.merge(&other.parts_etags);
- }
- }
-}
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index c7017409..7b023d87 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
@@ -10,19 +9,29 @@ use garage_table::*;
use garage_block::manager::*;
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BlockRef {
- /// Hash (blake2 sum) of the block, used as partition key
- pub block: Hash,
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
- /// Id of the Version for the object containing this block, used as sorting key
- pub version: Uuid,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BlockRef {
+ /// Hash (blake2 sum) of the block, used as partition key
+ pub block: Hash,
- // Keep track of deleted status
- /// Is the Version that contains this block deleted
- pub deleted: crdt::Bool,
+ /// Id of the Version for the object containing this block, used as sorting key
+ pub version: Uuid,
+
+ // Keep track of deleted status
+ /// Is the Version that contains this block deleted
+ pub deleted: crdt::Bool,
+ }
+
+ impl garage_util::migrate::InitialFormat for BlockRef {}
}
+pub use v08::*;
+
impl Entry<Hash, Uuid> for BlockRef {
fn partition_key(&self) -> &Hash {
&self.block
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 1b2f0014..518acc95 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
use std::sync::Arc;
use garage_db as db;
@@ -13,25 +12,126 @@ use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
-use crate::prev::v051::object_table as old;
-
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket_id: Uuid,
+mod v05 {
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket: String,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading(ObjectVersionHeaders),
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ /// Data stored in object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+ }
+
+ /// Metadata about the object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionMeta {
+ /// Headers to send to the client
+ pub headers: ObjectVersionHeaders,
+ /// Size of the object
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+ }
+
+ /// Additional headers for an object
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionHeaders {
+ /// Content type of the object
+ pub content_type: String,
+ /// Any other http headers to send
+ pub other: BTreeMap<String, String>,
+ }
+
+ impl garage_util::migrate::InitialFormat for Object {}
+}
+
+mod v08 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
+ pub use v05::{
+ ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
+ ObjectVersionState,
+ };
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
+ impl garage_util::migrate::Migrate for Object {
+ type Previous = v05::Object;
+
+ fn migrate(old: v05::Object) -> Object {
+ use garage_util::data::blake2sum;
+
+ Object {
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ versions: old.versions,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl Object {
/// Initialize an Object struct from parts
pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self {
@@ -68,28 +168,6 @@ impl Object {
}
}
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
@@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState {
}
}
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid)
@@ -290,11 +336,6 @@ impl TableSchema for ObjectTable {
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
- Some(migrate_object(old_obj))
- }
}
impl CountedItem for Object {
@@ -339,64 +380,3 @@ impl CountedItem for Object {
]
}
}
-
-// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
-// (we just want to change bucket into bucket_id by hashing it)
-
-fn migrate_object(o: old::Object) -> Object {
- let versions = o
- .versions()
- .iter()
- .cloned()
- .map(migrate_object_version)
- .collect();
- Object {
- bucket_id: blake2sum(o.bucket.as_bytes()),
- key: o.key,
- versions,
- }
-}
-
-fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion {
- ObjectVersion {
- uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(),
- timestamp: v.timestamp,
- state: match v.state {
- old::ObjectVersionState::Uploading(h) => {
- ObjectVersionState::Uploading(migrate_object_version_headers(h))
- }
- old::ObjectVersionState::Complete(d) => {
- ObjectVersionState::Complete(migrate_object_version_data(d))
- }
- old::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- },
- }
-}
-
-fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders {
- ObjectVersionHeaders {
- content_type: h.content_type,
- other: h.other,
- }
-}
-
-fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData {
- match d {
- old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
- old::ObjectVersionData::Inline(m, b) => {
- ObjectVersionData::Inline(migrate_object_version_meta(m), b)
- }
- old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock(
- migrate_object_version_meta(m),
- Hash::try_from(h.as_slice()).unwrap(),
- ),
- }
-}
-
-fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta {
- ObjectVersionMeta {
- headers: migrate_object_version_headers(m.headers),
- size: m.size,
- etag: m.etag,
- }
-}
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 0486512b..6edc83f4 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
@@ -11,32 +10,108 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-use crate::prev::v051::version_table as old;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket_id: Uuid,
- /// Key in which the related object is stored
- pub key: String,
+mod v05 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+ }
+
+ /// Informations about a single block
+ #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for Version {}
+}
+
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ pub use v05::{VersionBlock, VersionBlockKey};
+
+ impl garage_util::migrate::Migrate for Version {
+ type Previous = v05::Version;
+
+ fn migrate(old: v05::Version) -> Version {
+ use garage_util::data::blake2sum;
+
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ parts_etags: old.parts_etags,
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
@@ -64,14 +139,6 @@ impl Version {
}
}
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
@@ -86,15 +153,6 @@ impl PartialOrd for VersionBlockKey {
}
}
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -166,42 +224,4 @@ impl TableSchema for VersionTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get())
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
-
- let blocks = old
- .blocks
- .items()
- .iter()
- .map(|(k, v)| {
- (
- VersionBlockKey {
- part_number: k.part_number,
- offset: k.offset,
- },
- VersionBlock {
- hash: Hash::try_from(v.hash.as_slice()).unwrap(),
- size: v.size,
- },
- )
- })
- .collect::<crdt::Map<_, _>>();
-
- let parts_etags = old
- .parts_etags
- .items()
- .iter()
- .map(|(k, v)| (*k, v.clone()))
- .collect::<crdt::Map<_, _>>();
-
- Some(Version {
- uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
- deleted: crdt::Bool::new(old.deleted.get()),
- blocks,
- parts_etags,
- bucket_id: blake2sum(old.bucket.as_bytes()),
- key: old.key,
- })
- }
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index b87374ad..e9a0929a 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -25,7 +25,6 @@ rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 2fd5acfc..1030e3a6 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use crate::ring::*;
@@ -35,6 +36,8 @@ pub struct ClusterLayout {
pub staging_hash: Hash,
}
+impl garage_util::migrate::InitialFormat for ClusterLayout {}
+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
@@ -68,7 +71,7 @@ impl NodeRole {
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
let empty_lwwmap = LwwMap::new();
- let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
+ let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]);
ClusterLayout {
version: 0,
@@ -90,7 +93,7 @@ impl ClusterLayout {
Ordering::Equal => {
self.staging.merge(&other.staging);
- let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
@@ -125,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -149,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
self.version += 1;
@@ -178,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// returns true if consistent, false if error
pub fn check(&self) -> bool {
// Check that the hash of the staging data is correct
- let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
if staging_hash != self.staging_hash {
return false;
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 8f753b7f..22e23e55 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -73,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -721,7 +725,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -801,12 +805,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index e1a74553..3911c945 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -28,7 +28,6 @@ hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index 40856b02..5c792f1f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
@@ -40,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) metrics: TableMetrics,
}
-impl<F, R> TableData<F, R>
-where
- F: TableSchema,
- R: TableReplication,
-{
+impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
@@ -219,7 +216,8 @@ where
// data format, the messagepack encoding changed. In this case,
// we also have to write the migrated value in the table and update
// the associated Merkle tree entry.
- let new_bytes = rmp_to_vec_all_named(&new_entry)
+ let new_bytes = new_entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
@@ -329,9 +327,9 @@ where
Some(old_v) => {
let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
entry.merge(ins);
- rmp_to_vec_all_named(&entry)
+ entry.encode()
}
- None => rmp_to_vec_all_named(ins),
+ None => ins.encode(),
};
let new_entry = new_entry
.map_err(Error::RmpEncode)
@@ -351,18 +349,18 @@ where
}
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
+ match F::E::decode(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ error!("Unable to decode entry of {}", F::TABLE_NAME);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
}
- },
+ Err(Error::Message(format!(
+ "Unable to decode entry of {}",
+ F::TABLE_NAME
+ )))
+ }
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 90594fba..5b9124a7 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024;
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
-pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -49,11 +49,7 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
-impl<F, R> TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
@@ -277,11 +273,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
@@ -299,20 +291,12 @@ where
}
}
-struct GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+struct GcWorker<F: TableSchema, R: TableReplication> {
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
-impl<F, R> GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
@@ -322,11 +306,7 @@ where
}
#[async_trait]
-impl<F, R> Worker for GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 736354fa..e86d0251 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -10,6 +10,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
use garage_rpc::ring::*;
@@ -65,13 +66,9 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl<F, R> MerkleUpdater<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
- let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+ let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
Arc::new(Self {
data,
@@ -277,7 +274,7 @@ where
tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
+ let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
@@ -303,17 +300,10 @@ where
}
}
-struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
#[async_trait]
-impl<F, R> Worker for MerkleWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn name(&self) -> String {
format!("{} Merkle", F::TABLE_NAME)
}
@@ -375,7 +365,7 @@ impl MerkleNode {
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
- Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?),
}
}
diff --git a/src/table/queue.rs b/src/table/queue.rs
index 860f20d3..0857209b 100644
--- a/src/table/queue.rs
+++ b/src/table/queue.rs
@@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+ F: TableSchema,
+ R: TableReplication;
#[async_trait]
-impl<F, R> Worker for InsertQueueWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
fn name(&self) -> String {
format!("{} queue", F::TABLE_NAME)
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 3740d947..f00815a2 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -2,7 +2,7 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
-pub trait TableReplication: Send + Sync {
+pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
diff --git a/src/table/schema.rs b/src/table/schema.rs
index f37e98d8..5cbf6c95 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,11 +2,14 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_util::data::*;
+use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
/// Trait for field used to partition data
-pub trait PartitionKey {
+pub trait PartitionKey:
+ Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
+{
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@@ -27,7 +30,7 @@ impl PartitionKey for FixedBytes32 {
}
/// Trait for field used to sort data
-pub trait SortKey {
+pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@@ -46,7 +49,7 @@ impl SortKey for FixedBytes32 {
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
- Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
@@ -65,23 +68,16 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str;
/// The partition key used in that table
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type P: PartitionKey;
/// The sort key used int that table
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
/// The type for a filter that can be applied to select entries
/// (e.g. filter out deleted entries)
- type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
-
- // Action to take if not able to decode current version:
- // try loading from an older version
- /// Try migrating an entry from an older version
- fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
- None
- }
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// Actions triggered by data changing in a table. If such actions
/// include updates to the local database that should be applied
diff --git a/src/table/sync.rs b/src/table/sync.rs
index d6d272ab..92a353c6 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
@@ -28,7 +29,7 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -61,11 +62,7 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -302,7 +299,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -459,16 +456,12 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -497,7 +490,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -506,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -572,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
@@ -622,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
- Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
+ Ok(blake2sum(&nonversioned_encode(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
diff --git a/src/table/table.rs b/src/table/table.rs
index bbcd5971..7ad79677 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -32,7 +33,7 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
@@ -64,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>;
}
-impl<F, R> Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
@@ -122,7 +119,7 @@ where
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
@@ -173,7 +170,7 @@ where
let entry = entry.borrow();
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());
}
@@ -412,7 +409,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
+ let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
.rpc
.try_call_many(
@@ -427,11 +424,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
msg: &TableRpc<F>,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 11640027..32e9c851 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
diff --git a/src/util/data.rs b/src/util/data.rs
index 7715c2cc..3f61e301 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-
-// RMP serialization with names of fields and variants
-
-/// Serialize to MessagePack
-pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
-where
- T: Serialize + ?Sized,
-{
- let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
- val.serialize(&mut se)?;
- Ok(wr)
-}
-
-/// Serialize to JSON, truncating long result
-pub fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
- // (or more) codepoint
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
diff --git a/src/util/encode.rs b/src/util/encode.rs
new file mode 100644
index 00000000..1cd3198f
--- /dev/null
+++ b/src/util/encode.rs
@@ -0,0 +1,42 @@
+use serde::{Deserialize, Serialize};
+
+/// Serialize to MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}
+
+/// Deserialize from MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
+where
+ T: for<'de> Deserialize<'de> + ?Sized,
+{
+ rmp_serde::decode::from_read_ref::<_, T>(bytes)
+}
+
+/// Serialize to JSON, truncating long result
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index 9995c746..3fcee71d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,6 +7,7 @@ use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
+use crate::encode::debug_serialize;
/// Regroup all Garage errors
#[derive(Debug, Error)]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..be82061f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,9 +8,11 @@ pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
+pub mod encode;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..1229fd9c
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,159 @@
+use serde::{Deserialize, Serialize};
+
+/// Indicates that this type has an encoding that can be migrated from
+/// a previous version upon upgrades of Garage.
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated.
+ type Previous: Migrate;
+
+ /// The migration function that transforms a value decoded in the old format
+ /// to an up-to-date value.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ /// Decode an encoded version of this type, going through a migration if necessary.
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ let marker_len = Self::VERSION_MARKER.len();
+ if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
+ if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ /// Encode this type with optionnal version marker
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::VERSION_MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+/// Indicates that this type has no previous encoding version to be migrated from.
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+}
+
+impl<T: InitialFormat> Migrate for T {
+ const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+/// Internal type used by InitialFormat, not meant for general use.
+#[derive(Serialize, Deserialize)]
+pub enum NoPrevious {}
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V1 {
+ a: usize,
+ b: String,
+ }
+ impl InitialFormat for V1 {}
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V2 {
+ a: usize,
+ b: Vec<String>,
+ c: String,
+ }
+ impl Migrate for V2 {
+ const VERSION_MARKER: &'static [u8] = b"GtestV2";
+ type Previous = V1;
+ fn migrate(prev: V1) -> V2 {
+ V2 {
+ a: prev.a,
+ b: vec![prev.b],
+ c: String::new(),
+ }
+ }
+ }
+
+ #[test]
+ fn test_v1() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ let y = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_v2() {
+ let x = V2 {
+ a: 12,
+ b: vec!["hello".into(), "world".into()],
+ c: "plop".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_migrate() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+
+ let xx = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, xx);
+
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(
+ y,
+ V2 {
+ a: 12,
+ b: vec!["hello".into()],
+ c: "".into(),
+ }
+ );
+
+ let y_enc = y.encode().unwrap();
+ assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+
+ let z = V2::decode(&y_enc).unwrap();
+ assert_eq!(y, z);
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..4b9adf51 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +22,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +71,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;