diff options
-rw-r--r-- | Cargo.lock | 5 | ||||
-rw-r--r-- | Cargo.nix | 7 | ||||
-rw-r--r-- | src/block/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 1 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 5 | ||||
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/index_counter.rs | 9 | ||||
-rw-r--r-- | src/model/migrate.rs | 5 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 1 | ||||
-rw-r--r-- | src/rpc/layout.rs | 11 | ||||
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/merkle.rs | 7 | ||||
-rw-r--r-- | src/table/sync.rs | 3 | ||||
-rw-r--r-- | src/util/data.rs | 15 | ||||
-rw-r--r-- | src/util/encode.rs | 26 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/migrate.rs | 15 |
17 files changed, 57 insertions, 57 deletions
@@ -1080,7 +1080,6 @@ dependencies = [ "parse_duration", "prometheus", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "serde_json", @@ -1156,7 +1155,6 @@ dependencies = [ "hex", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", @@ -1200,7 +1198,6 @@ dependencies = [ "netapp", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", @@ -1229,7 +1226,6 @@ dependencies = [ "pnet_datalink", "rand 0.8.5", "reqwest", - "rmp-serde", "schemars", "serde", "serde_bytes", @@ -1255,7 +1251,6 @@ dependencies = [ "hexdump", "opentelemetry", "rand 0.8.5", - "rmp-serde", "serde", "serde_bytes", "tokio", @@ -32,7 +32,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "f277e0cfb8bb74ed78c99c8e75821c5066b4b8250e9c14d34d0cca70a38c6cab"; + nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1539,7 +1539,6 @@ in parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/prometheus" then "prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".prometheus."0.13.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; structopt = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; }).out; @@ -1639,7 +1638,6 @@ in hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; @@ -1710,7 +1708,6 @@ in netapp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; @@ -1752,7 +1749,6 @@ in pnet_datalink = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/reqwest" then "reqwest" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".reqwest."0.11.12" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage_rpc/kubernetes-discovery" || rootFeatures' ? "garage_rpc/schemars" then "schemars" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schemars."0.8.8" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; @@ -1781,7 +1777,6 @@ in hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; - rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out; serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out; diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index cbd58d32..1e4eb64e 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -31,7 +31,6 @@ rand = "0.8" async-compression = { version = "0.3", features = ["tokio", "zstd"] } zstd = { version = "0.9", default-features = false } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 337699ca..b43b0242 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -42,7 +42,6 @@ rand = "0.8" async-trait = "0.1.7" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" structopt = { version = "0.3", default-features = false } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 7120972c..627e3bf3 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -12,6 +12,7 @@ use garage_model::s3::version_table::*; use garage_table::*; use garage_util::background::*; use garage_util::error::Error; +use garage_util::migrate::Migrate; use crate::*; @@ -100,7 +101,7 @@ impl Worker for RepairVersionsWorker { } }; - let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; + let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; if !version.deleted.get() { let object = self .garage @@ -180,7 +181,7 @@ impl Worker for RepairBlockrefsWorker { } }; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; + let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; if !block_ref.deleted.get() { let version = self .garage diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 3d3fb693..323c2d64 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -30,7 +30,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 3cd3083a..35d6596d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -279,8 +279,8 @@ impl<T: CountedItem> IndexCounter<T> { info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { - let mut local_counter = - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter) + .ok_or_message("Cannot decode local counter entry")?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); @@ -335,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> { let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { - let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( - &old_bytes, - )?; + let ent = LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry")?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent diff --git a/src/model/migrate.rs b/src/model/migrate.rs index cd6ad26a..6b4c3eed 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use garage_util::crdt::*; use garage_util::data::*; +use garage_util::encode::nonversioned_decode; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -28,8 +29,8 @@ impl Migrate { let mut old_buckets = vec![]; for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; - let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) - .map_err(GarageError::from)?; + let bucket = + nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?; old_buckets.push(bucket); } diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index b87374ad..e9a0929a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -25,7 +25,6 @@ rand = "0.8" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" serde_json = "1.0" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index de4117a6..1030e3a6 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::*; use crate::ring::*; @@ -70,7 +71,7 @@ impl NodeRole { impl ClusterLayout { pub fn new(replication_factor: usize) -> Self { let empty_lwwmap = LwwMap::new(); - let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); + let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]); ClusterLayout { version: 0, @@ -92,7 +93,7 @@ impl ClusterLayout { Ordering::Equal => { self.staging.merge(&other.staging); - let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); let changed = new_staging_hash != self.staging_hash; self.staging_hash = new_staging_hash; @@ -127,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); self.version += 1; @@ -151,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show` } self.staging.clear(); - self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); self.version += 1; @@ -180,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show` /// returns true if consistent, false if error pub fn check(&self) -> bool { // Check that the hash of the staging data is correct - let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]); if staging_hash != self.staging_hash { return false; } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e1a74553..3911c945 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -28,7 +28,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 2d593e6d..e86d0251 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -10,6 +10,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; use garage_rpc::ring::*; @@ -67,7 +68,7 @@ pub enum MerkleNode { impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { - let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); Arc::new(Self { data, @@ -273,7 +274,7 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; + let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) @@ -364,7 +365,7 @@ impl MerkleNode { fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?), } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 29e7aa89..c66c863f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -615,7 +616,7 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { // ---- UTIL ---- fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( diff --git a/src/util/data.rs b/src/util/data.rs index 7715c2cc..b2a52e25 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -141,21 +141,6 @@ pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } -// RMP serialization with names of fields and variants - -/// Serialize to MessagePack -pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> -where - T: Serialize + ?Sized, -{ - let mut wr = Vec::with_capacity(128); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); - val.serialize(&mut se)?; - Ok(wr) -} - /// Serialize to JSON, truncating long result pub fn debug_serialize<T: Serialize>(x: T) -> String { match serde_json::to_string(&x) { diff --git a/src/util/encode.rs b/src/util/encode.rs new file mode 100644 index 00000000..724e482a --- /dev/null +++ b/src/util/encode.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +/// Serialize to MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) +} + +/// Deserialize from MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error> +where + T: for<'de> Deserialize<'de> + ?Sized, +{ + rmp_serde::decode::from_read_ref::<_, T>(bytes) +} diff --git a/src/util/lib.rs b/src/util/lib.rs index fd3d5c7b..be82061f 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -8,6 +8,7 @@ pub mod background; pub mod config; pub mod crdt; pub mod data; +pub mod encode; pub mod error; pub mod formater; pub mod metrics; diff --git a/src/util/migrate.rs b/src/util/migrate.rs index 199c68f6..f6028bf4 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. - const MARKER: &'static [u8] = b""; + const VERSION_MARKER: &'static [u8] = b""; /// The previous version of this data type, from which items of this version /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype @@ -15,10 +15,9 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn migrate(previous: Self::Previous) -> Self; fn decode(bytes: &[u8]) -> Option<Self> { - if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER { - if let Ok(value) = - rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..]) - { + let marker_len = Self::VERSION_MARKER.len(); + if bytes.len() >= marker_len && &bytes[..marker_len] == Self::VERSION_MARKER { + if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) { return Some(value); } } @@ -28,7 +27,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { let mut wr = Vec::with_capacity(128); - wr.extend_from_slice(Self::MARKER); + wr.extend_from_slice(Self::VERSION_MARKER); let mut se = rmp_serde::Serializer::new(&mut wr) .with_struct_map() .with_string_variants(); @@ -40,13 +39,13 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { /// A sequence of bytes to add at the beginning of the serialized /// string, to identify that the data is of this version. - const MARKER: &'static [u8] = b""; + const VERSION_MARKER: &'static [u8] = b""; } // ---- impl<T: InitialFormat> Migrate for T { - const MARKER: &'static [u8] = <T as InitialFormat>::MARKER; + const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER; type Previous = NoPrevious; |