From cdb2a591e9d393d24ab5c49bb905b0589b193299 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 14:44:47 +0100 Subject: Refactor how things are migrated --- Cargo.lock | 1 + src/block/repair.rs | 1 + src/block/resync.rs | 1 + src/garage/Cargo.toml | 2 +- src/model/bucket_alias_table.rs | 20 ++- src/model/bucket_table.rs | 128 ++++++++++--------- src/model/index_counter.rs | 50 +++++--- src/model/k2v/item_table.rs | 54 +++++--- src/model/key_table.rs | 154 ++++++++++++++-------- src/model/prev/v051/bucket_table.rs | 2 +- src/model/prev/v051/key_table.rs | 50 -------- src/model/prev/v051/mod.rs | 3 - src/model/prev/v051/object_table.rs | 149 ---------------------- src/model/prev/v051/version_table.rs | 79 ------------ src/model/s3/block_ref_table.rs | 29 +++-- src/model/s3/object_table.rs | 241 ++++++++++++++++------------------- src/model/s3/version_table.rs | 180 ++++++++++++++------------ src/rpc/layout.rs | 2 + src/rpc/system.rs | 18 ++- src/table/data.rs | 30 +++-- src/table/schema.rs | 23 ++-- src/table/sync.rs | 6 +- src/table/table.rs | 7 +- src/util/Cargo.toml | 1 + src/util/lib.rs | 1 + src/util/migrate.rs | 75 +++++++++++ src/util/persister.rs | 38 ++++-- 27 files changed, 637 insertions(+), 708 deletions(-) delete mode 100644 src/model/prev/v051/key_table.rs delete mode 100644 src/model/prev/v051/object_table.rs delete mode 100644 src/model/prev/v051/version_table.rs create mode 100644 src/util/migrate.rs diff --git a/Cargo.lock b/Cargo.lock index b1bbf35e..2a89fa1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1277,6 +1277,7 @@ dependencies = [ "garage_db", "git-version", "hex", + "hexdump", "http", "hyper", "lazy_static", diff --git a/src/block/repair.rs b/src/block/repair.rs index f5515d4e..a6ded65a 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -178,6 +178,7 @@ struct ScrubWorkerPersisted { time_last_complete_scrub: u64, corruptions_detected: u64, } +impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} enum ScrubWorkerState { Running(BlockStoreIterator), diff --git a/src/block/resync.rs b/src/block/resync.rs index 51bb9846..9c7b3b0e 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -63,6 +63,7 @@ struct ResyncPersistedConfig { n_workers: usize, tranquility: u32, } +impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} enum ResyncIterResult { BusyDidSomething, diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index cee7060e..337699ca 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -74,7 +74,7 @@ base64 = "0.13" [features] -default = [ "bundled-libs", "metrics", "sled" ] +default = [ "bundled-libs", "metrics", "sled", "k2v" ] k2v = [ "garage_util/k2v", "garage_api/k2v" ] diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index fcd1536e..d07394f6 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -5,14 +5,22 @@ use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; -/// The bucket alias table holds the names given to buckets -/// in the global namespace. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketAlias { - name: String, - pub state: crdt::Lww>, +mod v08 { + use super::*; + + /// The bucket alias table holds the names given to buckets + /// in the global namespace. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketAlias { + pub(super) name: String, + pub state: crdt::Lww>, + } + + impl garage_util::migrate::InitialFormat for BucketAlias {} } +pub use v08::*; + impl BucketAlias { pub fn new(name: String, ts: u64, bucket_id: Option) -> Option { if !is_valid_bucket_name(&name) { diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7be42702..38ed88ee 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -7,72 +7,80 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; -/// A bucket is a collection of objects -/// -/// Its parameters are not directly accessible as: -/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. -/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Bucket { - /// ID of the bucket - pub id: Uuid, - /// State, and configuration if not deleted, of the bucket - pub state: crdt::Deletable, -} +mod v08 { + use super::*; + + /// A bucket is a collection of objects + /// + /// Its parameters are not directly accessible as: + /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. + /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Bucket { + /// ID of the bucket + pub id: Uuid, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Deletable, + } -/// Configuration for a bucket -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketParams { - /// Bucket's creation date - pub creation_date: u64, - /// Map of key with access to the bucket, and what kind of access they give - pub authorized_keys: crdt::Map, - - /// Map of aliases that are or have been given to this bucket - /// in the global namespace - /// (not authoritative: this is just used as an indication to - /// map back to aliases when doing ListBuckets) - pub aliases: crdt::LwwMap, - /// Map of aliases that are or have been given to this bucket - /// in namespaces local to keys - /// key = (access key id, alias name) - pub local_aliases: crdt::LwwMap<(String, String), bool>, - - /// Whether this bucket is allowed for website access - /// (under all of its global alias names), - /// and if so, the website configuration XML document - pub website_config: crdt::Lww>, - /// CORS rules - pub cors_config: crdt::Lww>>, - /// Bucket quotas - #[serde(default)] - pub quotas: crdt::Lww, -} + /// Configuration for a bucket + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::Map, + + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, + + /// Whether this bucket is allowed for website access + /// (under all of its global alias names), + /// and if so, the website configuration XML document + pub website_config: crdt::Lww>, + /// CORS rules + pub cors_config: crdt::Lww>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct WebsiteConfig { - pub index_document: String, - pub error_document: Option, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct WebsiteConfig { + pub index_document: String, + pub error_document: Option, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CorsRule { - pub id: Option, - pub max_age_seconds: Option, - pub allow_origins: Vec, - pub allow_methods: Vec, - pub allow_headers: Vec, - pub expose_headers: Vec, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CorsRule { + pub id: Option, + pub max_age_seconds: Option, + pub allow_origins: Vec, + pub allow_methods: Vec, + pub allow_headers: Vec, + pub expose_headers: Vec, + } -#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct BucketQuotas { - /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) - pub max_size: Option, - /// Maximum number of non-deleted objects in the bucket - pub max_objects: Option, + #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option, + } + + impl garage_util::migrate::InitialFormat for Bucket {} } +pub use v08::*; + impl AutoCrdt for BucketQuotas { const WARN_IF_DIFFERENT: bool = true; } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 6303ea3e..c3ed29c7 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -12,6 +12,7 @@ use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_util::time::*; use garage_table::crdt::*; @@ -29,14 +30,28 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { fn counts(&self) -> Vec<(&'static str, i64)>; } -/// A counter entry in the global table -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct CounterEntry { - pub pk: T::CP, - pub sk: T::CS, - pub values: BTreeMap, +mod v08 { + use super::*; + + /// A counter entry in the global table + #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] + pub struct CounterEntry { + pub pk: T::CP, + pub sk: T::CS, + pub values: BTreeMap, + } + + /// A counter entry in the global table + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CounterValue { + pub node_values: BTreeMap, + } + + impl garage_util::migrate::InitialFormat for CounterEntry {} } +pub use v08::*; + impl Entry for CounterEntry { fn partition_key(&self) -> &T::CP { &self.pk @@ -78,12 +93,6 @@ impl CounterEntry { } } -/// A counter entry in the global table -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterValue { - pub node_values: BTreeMap, -} - impl Crdt for CounterEntry { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { @@ -195,11 +204,9 @@ impl IndexCounter { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } + Some(old_bytes) => LocalCounterEntry::::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry") + .map_err(db::TxError::Abort)?, None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), @@ -214,7 +221,8 @@ impl IndexCounter { ent.1 += *inc; } - let new_entry_bytes = rmp_to_vec_all_named(&entry) + let new_entry_bytes = entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; @@ -263,7 +271,7 @@ impl IndexCounter { tv.1 = 0; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; @@ -330,7 +338,7 @@ impl IndexCounter { tv.1 += v; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; @@ -357,6 +365,8 @@ struct LocalCounterEntry { values: BTreeMap, } +impl garage_util::migrate::InitialFormat for LocalCounterEntry {} + impl LocalCounterEntry { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 7860cb17..ce3e4129 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -1,7 +1,8 @@ -use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use serde::{Deserialize, Serialize}; + use garage_db as db; use garage_util::data::*; @@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct K2VItem { - pub partition: K2VItemPartition, - pub sort_key: String, +mod v08 { + use crate::k2v::causality::K2VNodeId; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; - items: BTreeMap, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] -pub struct K2VItemPartition { - pub bucket_id: Uuid, - pub partition_key: String, -} + pub(super) items: BTreeMap, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -struct DvvsEntry { - t_discard: u64, - values: Vec<(u64, DvvsValue)>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] + pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct DvvsEntry { + pub(super) t_discard: u64, + pub(super) values: Vec<(u64, DvvsValue)>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum DvvsValue { - Value(#[serde(with = "serde_bytes")] Vec), - Deleted, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec), + Deleted, + } + + impl garage_util::migrate::InitialFormat for K2VItem {} } +pub use v08::*; + impl K2VItem { /// Creates a new K2VItem when no previous entry existed in the db pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 9d2fc783..bb5334a3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,45 +1,121 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::*; -use garage_table::*; +use garage_util::crdt::{self, Crdt}; use garage_util::data::*; +use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema}; + use crate::permission::BucketKeyPerm; -use crate::prev::v051::key_table as old; +pub(crate) mod v05 { + use garage_util::crdt; + use serde::{Deserialize, Serialize}; -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, - /// Internal state of the key - pub state: crdt::Deletable, -} + /// The secret_key associated + pub secret_key: String, -/// Configuration for a key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct KeyParams { - /// The secret_key associated (immutable) - pub secret_key: String, + /// Name for the key + pub name: crdt::Lww, - /// Name for the key - pub name: crdt::Lww, + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap, + } - /// Flag to allow users having this key to create buckets - pub allow_create_bucket: crdt::Lww, + /// Permission given to a key in a bucket + #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, + } + + impl crdt::AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; + } - /// If the key is present: it gives some permissions, - /// a map of bucket IDs (uuids) to permissions. - /// Otherwise no permissions are granted to key - pub authorized_buckets: crdt::Map, + impl garage_util::migrate::InitialFormat for Key {} +} - /// A key can have a local view of buckets names it is - /// the only one to see, this is the namespace for these aliases - pub local_aliases: crdt::LwwMap>, +mod v08 { + use super::v05; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// Internal state of the key + pub state: crdt::Deletable, + } + + /// Configuration for a key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct KeyParams { + /// The secret_key associated (immutable) + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww, + + /// Flag to allow users having this key to create buckets + pub allow_create_bucket: crdt::Lww, + + /// If the key is present: it gives some permissions, + /// a map of bucket IDs (uuids) to permissions. + /// Otherwise no permissions are granted to key + pub authorized_buckets: crdt::Map, + + /// A key can have a local view of buckets names it is + /// the only one to see, this is the namespace for these aliases + pub local_aliases: crdt::LwwMap>, + } + + impl garage_util::migrate::Migrate for Key { + type Previous = v05::Key; + + fn migrate(old_k: v05::Key) -> Key { + let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); + + let state = if old_k.deleted.get() { + crdt::Deletable::Deleted + } else { + // Authorized buckets is ignored here, + // migration is performed in specific migration code in + // garage/migrate.rs + crdt::Deletable::Present(KeyParams { + secret_key: old_k.secret_key, + name, + allow_create_bucket: crdt::Lww::new(false), + authorized_buckets: crdt::Map::new(), + local_aliases: crdt::LwwMap::new(), + }) + }; + Key { + key_id: old_k.key_id, + state, + } + } + } } +pub use v08::*; + impl KeyParams { fn new(secret_key: &str, name: &str) -> Self { KeyParams { @@ -173,28 +249,4 @@ impl TableSchema for KeyTable { } } } - - fn try_migrate(bytes: &[u8]) -> Option { - let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?; - let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); - - let state = if old_k.deleted.get() { - crdt::Deletable::Deleted - } else { - // Authorized buckets is ignored here, - // migration is performed in specific migration code in - // garage/migrate.rs - crdt::Deletable::Present(KeyParams { - secret_key: old_k.secret_key, - name, - allow_create_bucket: crdt::Lww::new(false), - authorized_buckets: crdt::Map::new(), - local_aliases: crdt::LwwMap::new(), - }) - }; - Some(Key { - key_id: old_k.key_id, - state, - }) - } } diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs index 628a49dd..19893458 100644 --- a/src/model/prev/v051/bucket_table.rs +++ b/src/model/prev/v051/bucket_table.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::Crdt; use garage_table::*; -use super::key_table::PermissionSet; +use crate::key_table::v05::PermissionSet; /// A bucket is a collection of objects /// diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs deleted file mode 100644 index 37516b1c..00000000 --- a/src/model/prev/v051/key_table.rs +++ /dev/null @@ -1,50 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_table::crdt::*; -use garage_table::*; - -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, - - /// The secret_key associated - pub secret_key: String, - - /// Name for the key - pub name: crdt::Lww, - - /// Is the key deleted - pub deleted: crdt::Bool, - - /// Buckets in which the key is authorized. Empty if `Key` is deleted - // CRDT interaction: deleted implies authorized_buckets is empty - pub authorized_buckets: crdt::LwwMap, -} - -/// Permission given to a key in a bucket -#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct PermissionSet { - /// The key can be used to read the bucket - pub allow_read: bool, - /// The key can be used to write in the bucket - pub allow_write: bool, -} - -impl AutoCrdt for PermissionSet { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Key { - fn merge(&mut self, other: &Self) { - self.name.merge(&other.name); - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.authorized_buckets.clear(); - } else { - self.authorized_buckets.merge(&other.authorized_buckets); - } - } -} diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs index 7a954752..8c1335a5 100644 --- a/src/model/prev/v051/mod.rs +++ b/src/model/prev/v051/mod.rs @@ -1,4 +1 @@ pub(crate) mod bucket_table; -pub(crate) mod key_table; -pub(crate) mod object_table; -pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs deleted file mode 100644 index e79e5787..00000000 --- a/src/model/prev/v051/object_table.rs +++ /dev/null @@ -1,149 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; - -use garage_util::data::*; - -use garage_table::crdt::*; - -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket: String, - - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, - - /// The list of currenty stored versions of the object - versions: Vec, -} - -impl Object { - /// Get a list of currently stored versions of `Object` - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - -impl Crdt for ObjectVersionState { - fn merge(&mut self, other: &Self) { - use ObjectVersionState::*; - match other { - Aborted => { - *self = Aborted; - } - Complete(b) => match self { - Aborted => {} - Complete(a) => { - a.merge(b); - } - Uploading(_) => { - *self = Complete(b.clone()); - } - }, - Uploading(_) => {} - } - } -} - -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - -impl AutoCrdt for ObjectVersionData { - const WARN_IF_DIFFERENT: bool = true; -} - -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap, -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, Uuid) { - (self.timestamp, self.uuid) - } - - /// Is the object version completely received - pub fn is_complete(&self) -> bool { - matches!(self.state, ObjectVersionState::Complete(_)) - } -} - -impl Crdt for Object { - fn merge(&mut self, other: &Self) { - // Merge versions from other into here - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - self.versions[i].state.merge(&other_v.state); - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - - // Remove versions which are obsolete, i.e. those that come - // before the last version which .is_complete(). - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .find(|(_, v)| v.is_complete()) - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::>(); - } - } -} diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs deleted file mode 100644 index c11c62d5..00000000 --- a/src/model/prev/v051/version_table.rs +++ /dev/null @@ -1,79 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_util::data::*; - -use garage_table::crdt::*; -use garage_table::*; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket: String, - /// Key in which the related object is stored - pub key: String, -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - -impl Ord for VersionBlockKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.part_number - .cmp(&other.part_number) - .then(self.offset.cmp(&other.offset)) - } -} - -impl PartialOrd for VersionBlockKey { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - -impl AutoCrdt for VersionBlock { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Version { - fn merge(&mut self, other: &Self) { - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.blocks.clear(); - self.parts_etags.clear(); - } else { - self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); - } - } -} diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index c7017409..7b023d87 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; @@ -10,19 +9,29 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - /// Hash (blake2 sum) of the block, used as partition key - pub block: Hash, +mod v08 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; - /// Id of the Version for the object containing this block, used as sorting key - pub version: Uuid, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BlockRef { + /// Hash (blake2 sum) of the block, used as partition key + pub block: Hash, - // Keep track of deleted status - /// Is the Version that contains this block deleted - pub deleted: crdt::Bool, + /// Id of the Version for the object containing this block, used as sorting key + pub version: Uuid, + + // Keep track of deleted status + /// Is the Version that contains this block deleted + pub deleted: crdt::Bool, + } + + impl garage_util::migrate::InitialFormat for BlockRef {} } +pub use v08::*; + impl Entry for BlockRef { fn partition_key(&self) -> &Hash { &self.block diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 1b2f0014..616e0d35 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; use std::sync::Arc; use garage_db as db; @@ -13,25 +12,126 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use crate::prev::v051::object_table as old; - pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket_id: Uuid, +mod v05 { + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + /// Data stored in object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), + } + + /// Metadata about the object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, + } + + /// Additional headers for an object + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap, + } + + impl garage_util::migrate::InitialFormat for Object {} +} + +mod v08 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, + pub use v05::{ + ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta, + ObjectVersionState, + }; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec, + } - /// The list of currenty stored versions of the object - versions: Vec, + impl garage_util::migrate::Migrate for Object { + type Previous = v05::Object; + + fn migrate(old: v05::Object) -> Object { + use garage_util::data::blake2sum; + + Object { + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + versions: old.versions, + } + } + } } +pub use v08::*; + impl Object { /// Initialize an Object struct from parts pub fn new(bucket_id: Uuid, key: String, versions: Vec) -> Self { @@ -68,28 +168,6 @@ impl Object { } } -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - impl Crdt for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; @@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState { } } -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - impl AutoCrdt for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap, -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, Uuid) { (self.timestamp, self.uuid) @@ -290,11 +336,6 @@ impl TableSchema for ObjectTable { ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), } } - - fn try_migrate(bytes: &[u8]) -> Option { - let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; - Some(migrate_object(old_obj)) - } } impl CountedItem for Object { @@ -342,61 +383,3 @@ impl CountedItem for Object { // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) - -fn migrate_object(o: old::Object) -> Object { - let versions = o - .versions() - .iter() - .cloned() - .map(migrate_object_version) - .collect(); - Object { - bucket_id: blake2sum(o.bucket.as_bytes()), - key: o.key, - versions, - } -} - -fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), - timestamp: v.timestamp, - state: match v.state { - old::ObjectVersionState::Uploading(h) => { - ObjectVersionState::Uploading(migrate_object_version_headers(h)) - } - old::ObjectVersionState::Complete(d) => { - ObjectVersionState::Complete(migrate_object_version_data(d)) - } - old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - } -} - -fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { - ObjectVersionHeaders { - content_type: h.content_type, - other: h.other, - } -} - -fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { - match d { - old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, - old::ObjectVersionData::Inline(m, b) => { - ObjectVersionData::Inline(migrate_object_version_meta(m), b) - } - old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( - migrate_object_version_meta(m), - Hash::try_from(h.as_slice()).unwrap(), - ), - } -} - -fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { - ObjectVersionMeta { - headers: migrate_object_version_headers(m.headers), - size: m.size, - etag: m.etag, - } -} diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 0486512b..6edc83f4 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; @@ -11,32 +10,108 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use crate::prev::v051::version_table as old; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket_id: Uuid, - /// Key in which the related object is stored - pub key: String, +mod v05 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, + } + + /// Informations about a single block + #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, + } + + impl garage_util::migrate::InitialFormat for Version {} +} + +mod v08 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + pub use v05::{VersionBlock, VersionBlockKey}; + + impl garage_util::migrate::Migrate for Version { + type Previous = v05::Version; + + fn migrate(old: v05::Version) -> Version { + use garage_util::data::blake2sum; + + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + parts_etags: old.parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + } + } + } } +pub use v08::*; + impl Version { pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { Self { @@ -64,14 +139,6 @@ impl Version { } } -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - impl Ord for VersionBlockKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.part_number @@ -86,15 +153,6 @@ impl PartialOrd for VersionBlockKey { } } -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } @@ -166,42 +224,4 @@ impl TableSchema for VersionTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } - - fn try_migrate(bytes: &[u8]) -> Option { - let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; - - let blocks = old - .blocks - .items() - .iter() - .map(|(k, v)| { - ( - VersionBlockKey { - part_number: k.part_number, - offset: k.offset, - }, - VersionBlock { - hash: Hash::try_from(v.hash.as_slice()).unwrap(), - size: v.size, - }, - ) - }) - .collect::>(); - - let parts_etags = old - .parts_etags - .items() - .iter() - .map(|(k, v)| (*k, v.clone())) - .collect::>(); - - Some(Version { - uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), - deleted: crdt::Bool::new(old.deleted.get()), - blocks, - parts_etags, - bucket_id: blake2sum(old.bucket.as_bytes()), - key: old.key, - }) - } } diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index 2fd5acfc..de4117a6 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -35,6 +35,8 @@ pub struct ClusterLayout { pub staging_hash: Hash, } +impl garage_util::migrate::InitialFormat for ClusterLayout {} + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 8f753b7f..22e23e55 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -73,13 +73,17 @@ impl Rpc for SystemRpc { type Response = Result; } +#[derive(Serialize, Deserialize)] +pub struct PeerList(Vec<(Uuid, SocketAddr)>); +impl garage_util::migrate::InitialFormat for PeerList {} + /// This node's membership manager pub struct System { /// The id of this node pub id: Uuid, persist_cluster_layout: Persister, - persist_peer_list: Persister>, + persist_peer_list: Persister, local_status: ArcSwap, node_status: RwLock>, @@ -721,7 +725,7 @@ impl System { // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { - ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr))) + ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } // Fetch peer list from Consul @@ -801,12 +805,16 @@ impl System { // and append it to the list we are about to save, // so that no peer ID gets lost in the process. if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await { - prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); - peer_list.extend(prev_peer_list); + prev_peer_list + .0 + .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); + peer_list.extend(prev_peer_list.0); } // Save new peer list to file - self.persist_peer_list.save_async(&peer_list).await + self.persist_peer_list + .save_async(&PeerList(peer_list)) + .await } async fn pull_cluster_layout(self: Arc, peer: Uuid) { diff --git a/src/table/data.rs b/src/table/data.rs index 40856b02..f93ed00d 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_rpc::system::System; @@ -219,7 +220,8 @@ where // data format, the messagepack encoding changed. In this case, // we also have to write the migrated value in the table and update // the associated Merkle tree entry. - let new_bytes = rmp_to_vec_all_named(&new_entry) + let new_bytes = new_entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); @@ -329,9 +331,9 @@ where Some(old_v) => { let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); - rmp_to_vec_all_named(&entry) + entry.encode() } - None => rmp_to_vec_all_named(ins), + None => ins.encode(), }; let new_entry = new_entry .map_err(Error::RmpEncode) @@ -351,18 +353,18 @@ where } pub fn decode_entry(&self, bytes: &[u8]) -> Result { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) + match F::E::decode(bytes) { + Some(x) => Ok(x), + None => { + error!("Unable to decode entry of {}", F::TABLE_NAME); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); } - }, + Err(Error::Message(format!( + "Unable to decode entry of {}", + F::TABLE_NAME + ))) + } } } diff --git a/src/table/schema.rs b/src/table/schema.rs index f37e98d8..6538a32f 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; use garage_util::data::*; +use garage_util::migrate::Migrate; use crate::crdt::Crdt; @@ -46,7 +47,7 @@ impl SortKey for FixedBytes32 { /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry: - Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static { /// Get the key used to partition fn partition_key(&self) -> &P; @@ -65,23 +66,23 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type P: PartitionKey + + Clone + + PartialEq + + Serialize + + for<'de> Deserialize<'de> + + Send + + Sync + + 'static; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; /// They type for an entry in that table type E: Entry; /// The type for a filter that can be applied to select entries /// (e.g. filter out deleted entries) - type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - - // Action to take if not able to decode current version: - // try loading from an older version - /// Try migrating an entry from an older version - fn try_migrate(_bytes: &[u8]) -> Option { - None - } + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; /// Actions triggered by data changing in a table. If such actions /// include updates to the local database that should be applied diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..abc034f8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -302,7 +302,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -468,7 +468,7 @@ where match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -622,7 +622,7 @@ impl Worker for SyncWor // ---- UTIL ---- -fn hash_of(x: &T) -> Result { +fn hash_of_merkle_node(x: &MerkleNode) -> Result { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } diff --git a/src/table/table.rs b/src/table/table.rs index bbcd5971..7f158314 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use garage_util::migrate::Migrate; use garage_rpc::system::System; use garage_rpc::*; @@ -122,7 +123,7 @@ where let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); self.system @@ -173,7 +174,7 @@ where let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); } @@ -412,7 +413,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system .rpc .try_call_many( diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 11640027..32e9c851 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1.0" digest = "0.10" err-derive = "0.3" git-version = "0.3.4" +hexdump = "0.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" lazy_static = "1.4" diff --git a/src/util/lib.rs b/src/util/lib.rs index 264cc192..fd3d5c7b 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,6 +11,7 @@ pub mod data; pub mod error; pub mod formater; pub mod metrics; +pub mod migrate; pub mod persister; pub mod time; pub mod token_bucket; diff --git a/src/util/migrate.rs b/src/util/migrate.rs new file mode 100644 index 00000000..199c68f6 --- /dev/null +++ b/src/util/migrate.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; + +pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const MARKER: &'static [u8] = b""; + + /// The previous version of this data type, from which items of this version + /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype + /// is the initial schema and cannot be migrated. + type Previous: Migrate; + + /// This function must be filled in by implementors to migrate from a previons iteration + /// of the data format. + fn migrate(previous: Self::Previous) -> Self; + + fn decode(bytes: &[u8]) -> Option { + if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER { + if let Ok(value) = + rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..]) + { + return Some(value); + } + } + + Self::Previous::decode(bytes).map(Self::migrate) + } + + fn encode(&self) -> Result, rmp_serde::encode::Error> { + let mut wr = Vec::with_capacity(128); + wr.extend_from_slice(Self::MARKER); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + self.serialize(&mut se)?; + Ok(wr) + } +} + +pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const MARKER: &'static [u8] = b""; +} + +// ---- + +impl Migrate for T { + const MARKER: &'static [u8] = ::MARKER; + + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } +} + +#[derive(Serialize, Deserialize)] +pub struct NoPrevious; + +impl Migrate for NoPrevious { + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } + + fn decode(_bytes: &[u8]) -> Option { + None + } + + fn encode(&self) -> Result, rmp_serde::encode::Error> { + unreachable!() + } +} diff --git a/src/util/persister.rs b/src/util/persister.rs index 9e1a1910..4b9adf51 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -3,21 +3,16 @@ use std::path::{Path, PathBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use serde::{Deserialize, Serialize}; - -use crate::data::*; use crate::error::Error; +use crate::migrate::Migrate; -pub struct Persister Deserialize<'de>> { +pub struct Persister { path: PathBuf, _marker: std::marker::PhantomData, } -impl Persister -where - T: Serialize + for<'de> Deserialize<'de>, -{ +impl Persister { pub fn new(base_dir: &Path, file_name: &str) -> Self { let mut path = base_dir.to_path_buf(); path.push(file_name); @@ -27,18 +22,37 @@ where } } + fn decode(&self, bytes: &[u8]) -> Result { + match T::decode(bytes) { + Some(v) => Ok(v), + None => { + error!( + "Unable to decode persisted data file {}", + self.path.display() + ); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(Error::Message(format!( + "Unable to decode persisted data file {}", + self.path.display() + ))) + } + } + } + pub fn load(&self) -> Result { let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?; let mut bytes = vec![]; file.read_to_end(&mut bytes)?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub fn save(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = std::fs::OpenOptions::new() .write(true) @@ -57,12 +71,12 @@ where let mut bytes = vec![]; file.read_to_end(&mut bytes).await?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub async fn save_async(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = tokio::fs::File::create(&self.path).await?; file.write_all(&bytes[..]).await?; -- cgit v1.2.3 From a81200d345e23b93b8f6e1d879b6b14efbdfb2bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 14:45:47 +0100 Subject: Update cargo.nix --- Cargo.nix | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index 60e48034..e0064d08 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "4639f63ff4c54c01f66ec3d0d362f6905456dd768d6e94df1a7367c763721fd7"; + nixifiedLockHash = "f277e0cfb8bb74ed78c99c8e75821c5066b4b8250e9c14d34d0cca70a38c6cab"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1503,7 +1503,7 @@ in (lib.optional (rootFeatures' ? "garage/bundled-libs" || rootFeatures' ? "garage/default") "bundled-libs") (lib.optional (rootFeatures' ? "garage/consul-discovery") "consul-discovery") (lib.optional (rootFeatures' ? "garage/default") "default") - (lib.optional (rootFeatures' ? "garage/k2v") "k2v") + (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/k2v") "k2v") (lib.optional (rootFeatures' ? "garage/kubernetes-discovery") "kubernetes-discovery") (lib.optional (rootFeatures' ? "garage/lmdb") "lmdb") (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics") "metrics") @@ -1569,7 +1569,7 @@ in registry = "unknown"; src = fetchCrateLocal (workspaceSrc + "/src/api"); features = builtins.concatLists [ - (lib.optional (rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v") "k2v") + (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v") "k2v") (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics") "metrics") (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/opentelemetry-prometheus") "opentelemetry-prometheus") (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/prometheus") "prometheus") @@ -1688,7 +1688,7 @@ in src = fetchCrateLocal (workspaceSrc + "/src/model"); features = builtins.concatLists [ [ "default" ] - (lib.optional (rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v" || rootFeatures' ? "garage_model/k2v") "k2v") + (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v" || rootFeatures' ? "garage_model/k2v") "k2v") (lib.optional (rootFeatures' ? "garage/lmdb" || rootFeatures' ? "garage_model/lmdb") "lmdb") [ "sled" ] (lib.optional (rootFeatures' ? "garage/sqlite" || rootFeatures' ? "garage_model/sqlite") "sqlite") @@ -1795,7 +1795,7 @@ in registry = "unknown"; src = fetchCrateLocal (workspaceSrc + "/src/util"); features = builtins.concatLists [ - (lib.optional (rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v" || rootFeatures' ? "garage_model/k2v" || rootFeatures' ? "garage_util/k2v") "k2v") + (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/k2v" || rootFeatures' ? "garage_api/k2v" || rootFeatures' ? "garage_model/k2v" || rootFeatures' ? "garage_util/k2v") "k2v") ]; dependencies = { arc_swap = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }).out; @@ -1809,6 +1809,7 @@ in garage_db = (rustPackages."unknown".garage_db."0.8.1" { inherit profileName; }).out; git_version = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".git-version."0.3.5" { inherit profileName; }).out; hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; + hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.8" { inherit profileName; }).out; hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; }).out; lazy_static = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }).out; -- cgit v1.2.3 From 426d8784dac0e39879af52d980887d3692fc907c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:08:37 +0100 Subject: cleanup --- src/api/router_macros.rs | 1 + src/model/bucket_alias_table.rs | 6 +++--- src/model/bucket_table.rs | 7 ++++--- src/model/index_counter.rs | 27 +++++++++++++++++---------- src/model/s3/object_table.rs | 3 --- src/table/data.rs | 6 +----- src/table/gc.rs | 32 ++++++-------------------------- src/table/merkle.rs | 17 +++-------------- src/table/queue.rs | 10 +++------- src/table/replication/parameters.rs | 2 +- src/table/schema.rs | 17 ++++++----------- src/table/sync.rs | 20 ++++++-------------- src/table/table.rs | 14 +++----------- 13 files changed, 54 insertions(+), 108 deletions(-) diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs index 959e69a3..07b5570c 100644 --- a/src/api/router_macros.rs +++ b/src/api/router_macros.rs @@ -145,6 +145,7 @@ macro_rules! generateQueryParameters { ) => { #[derive(Debug)] #[allow(non_camel_case_types)] + #[allow(clippy::upper_case_acronyms)] enum Keyword { EMPTY, $( $kw_name, )* diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index d07394f6..54d7fbad 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -1,12 +1,12 @@ -use serde::{Deserialize, Serialize}; - use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; mod v08 { - use super::*; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// The bucket alias table holds the names given to buckets /// in the global namespace. diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 38ed88ee..ac163736 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,5 +1,3 @@ -use serde::{Deserialize, Serialize}; - use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -8,7 +6,10 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; mod v08 { - use super::*; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// A bucket is a collection of objects /// diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index c3ed29c7..3cd3083a 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -31,7 +31,12 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { } mod v08 { - use super::*; + use super::CountedItem; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + // ---- Global part (the table everyone queries) ---- /// A counter entry in the global table #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] @@ -48,6 +53,17 @@ mod v08 { } impl garage_util::migrate::InitialFormat for CounterEntry {} + + // ---- Local part (the counter we maintain transactionnaly on each node) ---- + + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] + pub(super) struct LocalCounterEntry { + pub(super) pk: T::CP, + pub(super) sk: T::CS, + pub(super) values: BTreeMap, + } + + impl garage_util::migrate::InitialFormat for LocalCounterEntry {} } pub use v08::*; @@ -358,15 +374,6 @@ impl IndexCounter { // ---- -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry { - pk: T::CP, - sk: T::CS, - values: BTreeMap, -} - -impl garage_util::migrate::InitialFormat for LocalCounterEntry {} - impl LocalCounterEntry { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 616e0d35..518acc95 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -380,6 +380,3 @@ impl CountedItem for Object { ] } } - -// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv -// (we just want to change bucket into bucket_id by hashing it) diff --git a/src/table/data.rs b/src/table/data.rs index f93ed00d..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -41,11 +41,7 @@ pub struct TableData { pub(crate) metrics: TableMetrics, } -impl TableData -where - F: TableSchema, - R: TableReplication, -{ +impl TableData { pub fn new(system: Arc, instance: F, replication: R, db: &db::Db) -> Arc { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc { +pub(crate) struct TableGc { system: Arc, data: Arc>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result; } -impl TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableGc { pub(crate) fn new(system: Arc, data: Arc>) -> Arc { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl EndpointHandler for TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableGc { async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker { gc: Arc>, wait_delay: Duration, } -impl GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl GcWorker { fn new(gc: Arc>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl Worker for GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for GcWorker { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..2d593e6d 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -65,11 +65,7 @@ pub enum MerkleNode { Leaf(Vec, Hash), } -impl MerkleUpdater -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl MerkleUpdater { pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); @@ -303,17 +299,10 @@ where } } -struct MerkleWorker(Arc>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker(Arc>); #[async_trait] -impl Worker for MerkleWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for MerkleWorker { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker(pub(crate) Arc>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl Worker for InsertQueueWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for InsertQueueWorker { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index 6538a32f..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -7,7 +7,9 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey - + Clone - + PartialEq - + Serialize - + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type S: SortKey; /// They type for an entry in that table type E: Entry; diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer { +pub struct TableSyncer { system: Arc, data: Arc>, merkle: Arc>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableSyncer { pub(crate) fn new( system: Arc, data: Arc>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl EndpointHandler for TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker { +struct SyncWorker { syncer: Arc>, ring_recv: watch::Receiver>, ring: Arc, @@ -506,7 +498,7 @@ struct SyncWorker { next_full_sync: Instant, } -impl SyncWorker { +impl SyncWorker { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl SyncWorker { } #[async_trait] -impl Worker for SyncWorker { +impl Worker for SyncWorker { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f158314..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -33,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table { +pub struct Table { pub system: Arc, pub data: Arc>, pub merkle_updater: Arc>, @@ -65,11 +65,7 @@ impl Rpc for TableRpc { type Response = Result, Error>; } -impl Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Table { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc, db: &db::Db) -> Arc { @@ -428,11 +424,7 @@ where } #[async_trait] -impl EndpointHandler> for Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler> for Table { async fn handle( self: &Arc, msg: &TableRpc, -- cgit v1.2.3 From 8d5505514f950dc1ca1249a3385c9913b5b5e8e0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:27:36 +0100 Subject: Make it explicit when using nonversioned encoding --- Cargo.lock | 5 ----- Cargo.nix | 7 +------ src/block/Cargo.toml | 1 - src/garage/Cargo.toml | 1 - src/garage/repair/online.rs | 5 +++-- src/model/Cargo.toml | 1 - src/model/index_counter.rs | 9 ++++----- src/model/migrate.rs | 5 +++-- src/rpc/Cargo.toml | 1 - src/rpc/layout.rs | 11 ++++++----- src/table/Cargo.toml | 1 - src/table/merkle.rs | 7 ++++--- src/table/sync.rs | 3 ++- src/util/data.rs | 15 --------------- src/util/encode.rs | 26 ++++++++++++++++++++++++++ src/util/lib.rs | 1 + src/util/migrate.rs | 15 +++++++-------- 17 files changed, 57 insertions(+), 57 deletions(-) create mode 100644 src/util/encode.rs diff --git a/Cargo.lock b/Cargo.lock index 2a89fa1a..fb8b4f5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1080,7 +1080,6 @@ dependencies = [ "parse_duration", "prometheus", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "serde_json", @@ -1156,7 +1155,6 @@ dependencies = [ "hex", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", @@ -1200,7 +1198,6 @@ dependencies = [ "netapp", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", @@ -1229,7 +1226,6 @@ dependencies = [ "pnet_datalink", "rand 0.8.5", "reqwest", - "rmp-serde", "schemars", "serde", "serde_bytes", @@ -1255,7 +1251,6 @@ dependencies = [ "hexdump", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", diff --git a/Cargo.nix b/Cargo.nix index e0064d08..79601cdd 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "f277e0cfb8bb74ed78c99c8e75821c5066b4b8250e9c14d34d0cca70a38c6cab"; + nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1539,7 +1539,6 @@ in parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/prometheus" then "prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".prometheus."0.13.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; structopt = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; }).out; @@ -1639,7 +1638,6 @@ in hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; @@ -1710,7 +1708,6 @@ in netapp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; @@ -1752,7 +1749,6 @@ in pnet_datalink = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/reqwest" then "reqwest" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".reqwest."0.11.12" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage_rpc/kubernetes-discovery" || rootFeatures' ? "garage_rpc/schemars" then "schemars" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schemars."0.8.8" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; @@ -1781,7 +1777,6 @@ in hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index cbd58d32..1e4eb64e 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -31,7 +31,6 @@ rand = "0.8" async-compression = { version = "0.3", features = ["tokio", "zstd"] } zstd = { version = "0.9", default-features = false } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 337699ca..b43b0242 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -42,7 +42,6 @@ rand = "0.8" async-trait = "0.1.7" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" structopt = { version = "0.3", default-features = false } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 7120972c..627e3bf3 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -12,6 +12,7 @@ use garage_model::s3::version_table::*; use garage_table::*; use garage_util::background::*; use garage_util::error::Error; +use garage_util::migrate::Migrate; use crate::*; @@ -100,7 +101,7 @@ impl Worker for RepairVersionsWorker { } }; - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { let object = self .garage @@ -180,7 +181,7 @@ impl Worker for RepairBlockrefsWorker { } }; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; if !block_ref.deleted.get() { let version = self .garage diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 3d3fb693..323c2d64 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -30,7 +30,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 3cd3083a..35d6596d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -279,8 +279,8 @@ impl IndexCounter { info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { - let mut local_counter = - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&local_counter)?; + let mut local_counter = LocalCounterEntry::::decode(&local_counter) + .ok_or_message("Cannot decode local counter entry")?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); @@ -335,9 +335,8 @@ impl IndexCounter { let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { - let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>( - &old_bytes, - )?; + let ent = LocalCounterEntry::::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry")?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent diff --git a/src/model/migrate.rs b/src/model/migrate.rs index cd6ad26a..6b4c3eed 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use garage_util::crdt::*; use garage_util::data::*; +use garage_util::encode::nonversioned_decode; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -28,8 +29,8 @@ impl Migrate { let mut old_buckets = vec![]; for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; - let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) - .map_err(GarageError::from)?; + let bucket = + nonversioned_decode::(&v[..]).map_err(GarageError::from)?; old_buckets.push(bucket); } diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index b87374ad..e9a0929a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -25,7 +25,6 @@ rand = "0.8" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = "1.0" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index de4117a6..1030e3a6 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::*; use crate::ring::*; @@ -70,7 +71,7 @@ impl NodeRole { impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { let empty_lwwmap = LwwMap::new(); - let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); + let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, @@ -92,7 +93,7 @@ impl ClusterLayout { Ordering::Equal => { self.staging.merge(&other.staging); - let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); let changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; @@ -127,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); self.version += 1; @@ -151,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); self.version += 1; @@ -180,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show` /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct - let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); if staging_hash != self.staging_hash { return false; } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e1a74553..3911c945 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -28,7 +28,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 2d593e6d..e86d0251 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -10,6 +10,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; use garage_rpc::ring::*; @@ -67,7 +68,7 @@ pub enum MerkleNode { impl MerkleUpdater { pub(crate) fn new(data: Arc>) -> Arc { - let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); Arc::new(Self { data, @@ -273,7 +274,7 @@ impl MerkleUpdater { tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; + let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) @@ -364,7 +365,7 @@ impl MerkleNode { fn decode_opt(ent: &Option) -> Result { match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + Some(v) => Ok(nonversioned_decode::(&v[..])?), } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 29e7aa89..c66c863f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -615,7 +616,7 @@ impl Worker for SyncWorker { // ---- UTIL ---- fn hash_of_merkle_node(x: &MerkleNode) -> Result { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( diff --git a/src/util/data.rs b/src/util/data.rs index 7715c2cc..b2a52e25 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -141,21 +141,6 @@ pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } -// RMP serialization with names of fields and variants - -/// Serialize to MessagePack -pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> -where - T: Serialize + ?Sized, -{ - let mut wr = Vec::with_capacity(128); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); - val.serialize(&mut se)?; - Ok(wr) -} - /// Serialize to JSON, truncating long result pub fn debug_serialize(x: T) -> String { match serde_json::to_string(&x) { diff --git a/src/util/encode.rs b/src/util/encode.rs new file mode 100644 index 00000000..724e482a --- /dev/null +++ b/src/util/encode.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +/// Serialize to MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_encode(val: &T) -> Result, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) +} + +/// Deserialize from MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_decode(bytes: &[u8]) -> Result +where + T: for<'de> Deserialize<'de> + ?Sized, +{ + rmp_serde::decode::from_read_ref::<_, T>(bytes) +} diff --git a/src/util/lib.rs b/src/util/lib.rs index fd3d5c7b..be82061f 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -8,6 +8,7 @@ pub mod background; pub mod config; pub mod crdt; pub mod data; +pub mod encode; pub mod error; pub mod formater; pub mod metrics; diff --git a/src/util/migrate.rs b/src/util/migrate.rs index 199c68f6..f6028bf4 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. - const MARKER: &'static [u8] = b""; + const VERSION_MARKER: &'static [u8] = b""; /// The previous version of this data type, from which items of this version /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype @@ -15,10 +15,9 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn migrate(previous: Self::Previous) -> Self; fn decode(bytes: &[u8]) -> Option { - if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER { - if let Ok(value) = - rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..]) - { + let marker_len = Self::VERSION_MARKER.len(); + if bytes.len() >= marker_len && &bytes[..marker_len] == Self::VERSION_MARKER { + if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) { return Some(value); } } @@ -28,7 +27,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn encode(&self) -> Result, rmp_serde::encode::Error> { let mut wr = Vec::with_capacity(128); - wr.extend_from_slice(Self::MARKER); + wr.extend_from_slice(Self::VERSION_MARKER); let mut se = rmp_serde::Serializer::new(&mut wr) .with_struct_map() .with_string_variants(); @@ -40,13 +39,13 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. - const MARKER: &'static [u8] = b""; + const VERSION_MARKER: &'static [u8] = b""; } // ---- impl Migrate for T { - const MARKER: &'static [u8] = ::MARKER; + const VERSION_MARKER: &'static [u8] = ::VERSION_MARKER; type Previous = NoPrevious; -- cgit v1.2.3 From a54b67740d08e3fabeb1652a1bed14d78fea4b74 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:29:29 +0100 Subject: move debug_serialize to garage_util::encode --- src/table/sync.rs | 2 +- src/util/data.rs | 16 ---------------- src/util/encode.rs | 16 ++++++++++++++++ src/util/error.rs | 1 + 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/src/table/sync.rs b/src/table/sync.rs index c66c863f..1f23d3a1 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::nonversioned_encode; +use garage_util::encode::{nonversioned_encode, debug_serialize}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; diff --git a/src/util/data.rs b/src/util/data.rs index b2a52e25..3f61e301 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -140,19 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash { pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } - -/// Serialize to JSON, truncating long result -pub fn debug_serialize(x: T) -> String { - match serde_json::to_string(&x) { - Ok(ss) => { - if ss.len() > 100 { - // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes - // (or more) codepoint - ss[..100].to_string() - } else { - ss - } - } - Err(e) => format!("", e), - } -} diff --git a/src/util/encode.rs b/src/util/encode.rs index 724e482a..1cd3198f 100644 --- a/src/util/encode.rs +++ b/src/util/encode.rs @@ -24,3 +24,19 @@ where { rmp_serde::decode::from_read_ref::<_, T>(bytes) } + +/// Serialize to JSON, truncating long result +pub fn debug_serialize(x: T) -> String { + match serde_json::to_string(&x) { + Ok(ss) => { + if ss.len() > 100 { + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint + ss[..100].to_string() + } else { + ss + } + } + Err(e) => format!("", e), + } +} diff --git a/src/util/error.rs b/src/util/error.rs index 9995c746..3fcee71d 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -7,6 +7,7 @@ use err_derive::Error; use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use crate::data::*; +use crate::encode::debug_serialize; /// Regroup all Garage errors #[derive(Debug, Error)] -- cgit v1.2.3 From d6d571d51216d2077a41216e067b32736fbd745a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:30:21 +0100 Subject: cargo fmt --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/table/sync.rs b/src/table/sync.rs index 1f23d3a1..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::{nonversioned_encode, debug_serialize}; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; -- cgit v1.2.3 From 33f25d26c7a81f7dc7ae3ab4fd2faa49fb053ceb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:53:13 +0100 Subject: fix doc and add tests for migrate.rs --- src/util/migrate.rs | 97 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/src/util/migrate.rs b/src/util/migrate.rs index f6028bf4..b7d6edc1 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -1,19 +1,21 @@ use serde::{Deserialize, Serialize}; +/// Indicates that this type has an encoding that can be migrated from +/// a previous version upon upgrades of Garage. pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. const VERSION_MARKER: &'static [u8] = b""; /// The previous version of this data type, from which items of this version - /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype - /// is the initial schema and cannot be migrated. + /// can be migrated. type Previous: Migrate; - /// This function must be filled in by implementors to migrate from a previons iteration - /// of the data format. + /// The migration function that transforms a value decoded in the old format + /// to an up-to-date value. fn migrate(previous: Self::Previous) -> Self; + /// Decode an encoded version of this type, going through a migration if necessary. fn decode(bytes: &[u8]) -> Option { let marker_len = Self::VERSION_MARKER.len(); if bytes.len() >= marker_len && &bytes[..marker_len] == Self::VERSION_MARKER { @@ -25,6 +27,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { Self::Previous::decode(bytes).map(Self::migrate) } + /// Encode this type with optionnal version marker fn encode(&self) -> Result, rmp_serde::encode::Error> { let mut wr = Vec::with_capacity(128); wr.extend_from_slice(Self::VERSION_MARKER); @@ -36,14 +39,13 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { } } +/// Indicates that this type has no previous encoding version to be migrated from. pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. const VERSION_MARKER: &'static [u8] = b""; } -// ---- - impl Migrate for T { const VERSION_MARKER: &'static [u8] = ::VERSION_MARKER; @@ -54,6 +56,7 @@ impl Migrate for T { } } +/// Internal type used by InitialFormat, not meant for general use. #[derive(Serialize, Deserialize)] pub struct NoPrevious; @@ -72,3 +75,85 @@ impl Migrate for NoPrevious { unreachable!() } } + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct V1 { + a: usize, + b: String, + } + impl InitialFormat for V1 {} + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct V2 { + a: usize, + b: Vec, + c: String, + } + impl Migrate for V2 { + const VERSION_MARKER: &'static [u8] = b"GtestV2"; + type Previous = V1; + fn migrate(prev: V1) -> V2 { + V2 { + a: prev.a, + b: vec![prev.b], + c: String::new(), + } + } + } + + #[test] + fn test_v1() { + let x = V1 { + a: 12, + b: "hello".into(), + }; + let x_enc = x.encode().unwrap(); + let y = V1::decode(&x_enc).unwrap(); + assert_eq!(x, y); + } + + #[test] + fn test_v2() { + let x = V2 { + a: 12, + b: vec!["hello".into(), "world".into()], + c: "plop".into(), + }; + let x_enc = x.encode().unwrap(); + assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER); + let y = V2::decode(&x_enc).unwrap(); + assert_eq!(x, y); + } + + #[test] + fn test_migrate() { + let x = V1 { + a: 12, + b: "hello".into(), + }; + let x_enc = x.encode().unwrap(); + + let xx = V1::decode(&x_enc).unwrap(); + assert_eq!(x, xx); + + let y = V2::decode(&x_enc).unwrap(); + assert_eq!( + y, + V2 { + a: 12, + b: vec!["hello".into()], + c: "".into(), + } + ); + + let y_enc = y.encode().unwrap(); + assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER); + + let z = V2::decode(&y_enc).unwrap(); + assert_eq!(y, z); + } +} -- cgit v1.2.3 From c106304b9cd325238742be4366877ed7316e7e28 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 16:00:19 +0100 Subject: more idiomatic and shorter --- src/util/migrate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/migrate.rs b/src/util/migrate.rs index b7d6edc1..00d81181 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -18,7 +18,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { /// Decode an encoded version of this type, going through a migration if necessary. fn decode(bytes: &[u8]) -> Option { let marker_len = Self::VERSION_MARKER.len(); - if bytes.len() >= marker_len && &bytes[..marker_len] == Self::VERSION_MARKER { + if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) { if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) { return Some(value); } -- cgit v1.2.3 From 1d5bdc17a46648eb3494ff629d0d360d0217c1e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 16:04:06 +0100 Subject: use impossible enum type --- src/util/migrate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/migrate.rs b/src/util/migrate.rs index 00d81181..1229fd9c 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -58,7 +58,7 @@ impl Migrate for T { /// Internal type used by InitialFormat, not meant for general use. #[derive(Serialize, Deserialize)] -pub struct NoPrevious; +pub enum NoPrevious {} impl Migrate for NoPrevious { type Previous = NoPrevious; -- cgit v1.2.3