aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/data.rs31
-rw-r--r--src/util/encode.rs42
-rw-r--r--src/util/error.rs1
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/migrate.rs159
-rw-r--r--src/util/persister.rs38
7 files changed, 231 insertions, 43 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 11640027..32e9c851 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
diff --git a/src/util/data.rs b/src/util/data.rs
index 7715c2cc..3f61e301 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-
-// RMP serialization with names of fields and variants
-
-/// Serialize to MessagePack
-pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
-where
- T: Serialize + ?Sized,
-{
- let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
- val.serialize(&mut se)?;
- Ok(wr)
-}
-
-/// Serialize to JSON, truncating long result
-pub fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
- // (or more) codepoint
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
diff --git a/src/util/encode.rs b/src/util/encode.rs
new file mode 100644
index 00000000..1cd3198f
--- /dev/null
+++ b/src/util/encode.rs
@@ -0,0 +1,42 @@
+use serde::{Deserialize, Serialize};
+
+/// Serialize to MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}
+
+/// Deserialize from MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
+where
+ T: for<'de> Deserialize<'de> + ?Sized,
+{
+ rmp_serde::decode::from_read_ref::<_, T>(bytes)
+}
+
+/// Serialize to JSON, truncating long result
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index 9995c746..3fcee71d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,6 +7,7 @@ use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
+use crate::encode::debug_serialize;
/// Regroup all Garage errors
#[derive(Debug, Error)]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..be82061f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,9 +8,11 @@ pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
+pub mod encode;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..1229fd9c
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,159 @@
+use serde::{Deserialize, Serialize};
+
+/// Indicates that this type has an encoding that can be migrated from
+/// a previous version upon upgrades of Garage.
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated.
+ type Previous: Migrate;
+
+ /// The migration function that transforms a value decoded in the old format
+ /// to an up-to-date value.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ /// Decode an encoded version of this type, going through a migration if necessary.
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ let marker_len = Self::VERSION_MARKER.len();
+ if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
+ if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ /// Encode this type with optionnal version marker
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::VERSION_MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+/// Indicates that this type has no previous encoding version to be migrated from.
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+}
+
+impl<T: InitialFormat> Migrate for T {
+ const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+/// Internal type used by InitialFormat, not meant for general use.
+#[derive(Serialize, Deserialize)]
+pub enum NoPrevious {}
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V1 {
+ a: usize,
+ b: String,
+ }
+ impl InitialFormat for V1 {}
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V2 {
+ a: usize,
+ b: Vec<String>,
+ c: String,
+ }
+ impl Migrate for V2 {
+ const VERSION_MARKER: &'static [u8] = b"GtestV2";
+ type Previous = V1;
+ fn migrate(prev: V1) -> V2 {
+ V2 {
+ a: prev.a,
+ b: vec![prev.b],
+ c: String::new(),
+ }
+ }
+ }
+
+ #[test]
+ fn test_v1() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ let y = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_v2() {
+ let x = V2 {
+ a: 12,
+ b: vec!["hello".into(), "world".into()],
+ c: "plop".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_migrate() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+
+ let xx = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, xx);
+
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(
+ y,
+ V2 {
+ a: 12,
+ b: vec!["hello".into()],
+ c: "".into(),
+ }
+ );
+
+ let y_enc = y.encode().unwrap();
+ assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+
+ let z = V2::decode(&y_enc).unwrap();
+ assert_eq!(y, z);
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..4b9adf51 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +22,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +71,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;