diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
commit | cdb2a591e9d393d24ab5c49bb905b0589b193299 (patch) | |
tree | 10c95206d0bd7b30c1fcd14ccc188be374cb1066 /src/util | |
parent | 582b0761790b7958a3ba10c4b549b466997d2dcd (diff) | |
download | garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.tar.gz garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.zip |
Refactor how things are migrated
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 1 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/migrate.rs | 75 | ||||
-rw-r--r-- | src/util/persister.rs | 38 |
4 files changed, 103 insertions, 12 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 11640027..32e9c851 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1.0" digest = "0.10" err-derive = "0.3" git-version = "0.3.4" +hexdump = "0.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" lazy_static = "1.4" diff --git a/src/util/lib.rs b/src/util/lib.rs index 264cc192..fd3d5c7b 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,6 +11,7 @@ pub mod data; pub mod error; pub mod formater; pub mod metrics; +pub mod migrate; pub mod persister; pub mod time; pub mod token_bucket; diff --git a/src/util/migrate.rs b/src/util/migrate.rs new file mode 100644 index 00000000..199c68f6 --- /dev/null +++ b/src/util/migrate.rs @@ -0,0 +1,75 @@ +use serde::{Deserialize, Serialize}; + +pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const MARKER: &'static [u8] = b""; + + /// The previous version of this data type, from which items of this version + /// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype + /// is the initial schema and cannot be migrated. + type Previous: Migrate; + + /// This function must be filled in by implementors to migrate from a previons iteration + /// of the data format. + fn migrate(previous: Self::Previous) -> Self; + + fn decode(bytes: &[u8]) -> Option<Self> { + if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER { + if let Ok(value) = + rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..]) + { + return Some(value); + } + } + + Self::Previous::decode(bytes).map(Self::migrate) + } + + fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { + let mut wr = Vec::with_capacity(128); + wr.extend_from_slice(Self::MARKER); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + self.serialize(&mut se)?; + Ok(wr) + } +} + +pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const MARKER: &'static [u8] = b""; +} + +// ---- + +impl<T: InitialFormat> Migrate for T { + const MARKER: &'static [u8] = <T as InitialFormat>::MARKER; + + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } +} + +#[derive(Serialize, Deserialize)] +pub struct NoPrevious; + +impl Migrate for NoPrevious { + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } + + fn decode(_bytes: &[u8]) -> Option<Self> { + None + } + + fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { + unreachable!() + } +} diff --git a/src/util/persister.rs b/src/util/persister.rs index 9e1a1910..4b9adf51 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -3,21 +3,16 @@ use std::path::{Path, PathBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use serde::{Deserialize, Serialize}; - -use crate::data::*; use crate::error::Error; +use crate::migrate::Migrate; -pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> { +pub struct Persister<T: Migrate> { path: PathBuf, _marker: std::marker::PhantomData<T>, } -impl<T> Persister<T> -where - T: Serialize + for<'de> Deserialize<'de>, -{ +impl<T: Migrate> Persister<T> { pub fn new(base_dir: &Path, file_name: &str) -> Self { let mut path = base_dir.to_path_buf(); path.push(file_name); @@ -27,18 +22,37 @@ where } } + fn decode(&self, bytes: &[u8]) -> Result<T, Error> { + match T::decode(bytes) { + Some(v) => Ok(v), + None => { + error!( + "Unable to decode persisted data file {}", + self.path.display() + ); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(Error::Message(format!( + "Unable to decode persisted data file {}", + self.path.display() + ))) + } + } + } + pub fn load(&self) -> Result<T, Error> { let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?; let mut bytes = vec![]; file.read_to_end(&mut bytes)?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub fn save(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = std::fs::OpenOptions::new() .write(true) @@ -57,12 +71,12 @@ where let mut bytes = vec![]; file.read_to_end(&mut bytes).await?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub async fn save_async(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = tokio::fs::File::create(&self.path).await?; file.write_all(&bytes[..]).await?; |