aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml13
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs72
-rw-r--r--src/util/background/vars.rs113
-rw-r--r--src/util/background/worker.rs85
-rw-r--r--src/util/config.rs152
-rw-r--r--src/util/data.rs35
-rw-r--r--src/util/encode.rs42
-rw-r--r--src/util/error.rs1
-rw-r--r--src/util/formater.rs8
-rw-r--r--src/util/lib.rs3
-rw-r--r--src/util/migrate.rs159
-rw-r--r--src/util/persister.rs72
-rw-r--r--src/util/time.rs2
-rw-r--r--src/util/token_bucket.rs40
15 files changed, 588 insertions, 257 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 8e978fc2..abeccbbd 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.8.0"
+version = "0.8.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,19 +14,20 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.0", path = "../db" }
+garage_db = { version = "0.8.1", path = "../db" }
arc-swap = "1.0"
async-trait = "0.1"
-blake2 = "0.9"
+blake2 = "0.10"
bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
sha2 = "0.10"
@@ -34,7 +35,7 @@ chrono = "0.4"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
-toml = "0.5"
+toml = "0.6"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
@@ -46,6 +47,8 @@ hyper = "0.14"
opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
+[dev-dependencies]
+mktemp = "0.5"
[features]
k2v = []
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
deleted file mode 100644
index 2568ea11..00000000
--- a/src/util/background/job_worker.rs
+++ /dev/null
@@ -1,48 +0,0 @@
-//! Job worker: a generic worker that just processes incoming
-//! jobs one by one
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use tokio::sync::{mpsc, Mutex};
-
-use crate::background::worker::*;
-use crate::background::*;
-
-pub(crate) struct JobWorker {
- pub(crate) index: usize,
- pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
- pub(crate) next_job: Option<Job>,
-}
-
-#[async_trait]
-impl Worker for JobWorker {
- fn name(&self) -> String {
- format!("Job worker #{}", self.index)
- }
-
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- match self.next_job.take() {
- None => return Ok(WorkerState::Idle),
- Some(job) => {
- job.await?;
- Ok(WorkerState::Busy)
- }
- }
- }
-
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- loop {
- match self.job_chan.lock().await.recv().await {
- Some((job, cancellable)) => {
- if cancellable && *must_exit.borrow() {
- continue;
- }
- self.next_job = Some(job);
- return WorkerState::Busy;
- }
- None => return WorkerState::Done,
- }
- }
- }
-}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 619f5068..607cd7a3 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,27 +1,19 @@
//! Job runner for futures and async functions
-pub mod job_worker;
+pub mod vars;
pub mod worker;
-use core::future::Future;
-
use std::collections::HashMap;
-use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch, Mutex};
+use tokio::sync::{mpsc, watch};
-use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
-pub(crate) type JobOutput = Result<(), Error>;
-pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
/// Job runner for futures and async functions
pub struct BackgroundRunner {
- send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
@@ -29,19 +21,27 @@ pub struct BackgroundRunner {
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
- pub info: Option<String>,
+ pub status: WorkerStatus,
pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
}
+/// WorkerStatus is a struct returned by the worker with a bunch of canonical
+/// fields to indicate their status to CLI users. All fields are optional.
+#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+pub struct WorkerStatus {
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
impl BackgroundRunner {
/// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
@@ -52,24 +52,7 @@ impl BackgroundRunner {
worker_processor.run().await;
});
- let (send_job, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
-
- send_worker
- .send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- }))
- .ok()
- .unwrap();
- }
-
let bgrunner = Arc::new(Self {
- send_job,
send_worker,
worker_info,
});
@@ -80,31 +63,6 @@ impl BackgroundRunner {
self.worker_info.lock().unwrap().clone()
}
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, false))
- .ok()
- .expect("Could not put job in queue");
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, true))
- .ok()
- .expect("Could not put job in queue");
- }
-
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,
diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs
new file mode 100644
index 00000000..7a449c95
--- /dev/null
+++ b/src/util/background/vars.rs
@@ -0,0 +1,113 @@
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+use crate::migrate::Migrate;
+use crate::persister::PersisterShared;
+
+pub struct BgVars {
+ vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
+}
+
+impl BgVars {
+ pub fn new() -> Self {
+ Self {
+ vars: HashMap::new(),
+ }
+ }
+
+ pub fn register_rw<V, T, GF, SF>(
+ &mut self,
+ p: &PersisterShared<V>,
+ name: &'static str,
+ get_fn: GF,
+ set_fn: SF,
+ ) where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let p2 = p.clone();
+ let set_fn = move |v| set_fn(&p2, v);
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
+ where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn get(&self, var: &str) -> Result<String, Error> {
+ Ok(self
+ .vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .get())
+ }
+
+ pub fn get_all(&self) -> Vec<(&'static str, String)> {
+ self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
+ }
+
+ pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
+ self.vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .set(val)
+ }
+}
+
+impl Default for BgVars {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ----
+
+trait BgVarTrait: Send + Sync + 'static {
+ fn get(&self) -> String;
+ fn set(&self, v: &str) -> Result<(), Error>;
+}
+
+struct BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn() -> T + Send + Sync + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ get_fn: GF,
+ set_fn: SF,
+}
+
+impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Sync + Send + 'static,
+ GF: Fn() -> T + Sync + Send + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ fn get(&self) -> String {
+ (self.get_fn)().to_string()
+ }
+
+ fn set(&self, vstr: &str) -> Result<(), Error> {
+ let value = vstr
+ .parse()
+ .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
+ (self.set_fn)(value)
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index f5e3addb..8165e2cb 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use async_trait::async_trait;
use futures::future::*;
@@ -10,10 +10,14 @@ use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
-use crate::background::WorkerInfo;
+use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
+// All workers that haven't exited for this time after an exit signal was recieved
+// will be interrupted in the middle of whatever they are doing.
+const EXIT_DEADLINE: Duration = Duration::from_secs(8);
+
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerState {
Busy,
@@ -26,7 +30,7 @@ impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerState::Busy => write!(f, "Busy"),
- WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Throttled(_) => write!(f, "Busy*"),
WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"),
}
@@ -37,8 +41,8 @@ impl std::fmt::Display for WorkerState {
pub trait Worker: Send {
fn name(&self) -> String;
- fn info(&self) -> Option<String> {
- None
+ fn status(&self) -> WorkerStatus {
+ Default::default()
}
/// Work: do a basic unit of work, if one is available (otherwise, should return
@@ -50,10 +54,8 @@ pub trait Worker: Send {
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
- /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
- /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
- /// it to check if we are exiting.
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+ /// the middle for any reason, for example if an interrupt signal was recieved.
+ async fn wait_for_work(&mut self) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
@@ -93,11 +95,9 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
- let stop_signal_worker = self.stop_signal.clone();
let mut worker = WorkerHandler {
task_id,
stop_signal,
- stop_signal_worker,
worker: new_worker,
state: WorkerState::Busy,
errors: 0,
@@ -119,7 +119,7 @@ impl WorkerProcessor {
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.state = worker.state;
- i.info = worker.worker.info();
+ i.status = worker.worker.status();
i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors;
if worker.last_error.is_some() {
@@ -130,7 +130,7 @@ impl WorkerProcessor {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
state: worker.state,
- info: worker.worker.info(),
+ status: worker.worker.status(),
errors: worker.errors,
consecutive_errors: worker.consecutive_errors,
last_error: worker.last_error.take(),
@@ -153,26 +153,14 @@ impl WorkerProcessor {
}
// We are exiting, drain everything
- let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
- while let Some(mut worker) = workers.next().await {
- if worker.state == WorkerState::Done {
- info!(
- "Worker {} (TID {}) exited",
- worker.worker.name(),
- worker.task_id
- );
- } else if Instant::now() > drain_half_time {
- warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
- } else {
- workers.push(
- async move {
- worker.step().await;
- worker
- }
- .boxed(),
- );
- }
+ while let Some(worker) = workers.next().await {
+ info!(
+ "Worker {} (TID {}) exited (last state: {:?})",
+ worker.worker.name(),
+ worker.task_id,
+ worker.state
+ );
}
};
@@ -180,7 +168,7 @@ impl WorkerProcessor {
_ = drain_everything => {
info!("All workers exited peacefully \\o/");
}
- _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ _ = tokio::time::sleep(EXIT_DEADLINE) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
@@ -190,7 +178,6 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
- stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
state: WorkerState,
errors: usize,
@@ -225,33 +212,19 @@ impl WorkerHandler {
},
WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
- if !*self.stop_signal.borrow() {
- select! {
- _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
- _ = self.stop_signal.changed() => (),
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => {
+ self.state = WorkerState::Busy;
}
+ _ = self.stop_signal.changed() => (),
}
- self.state = WorkerState::Busy;
}
WorkerState::Idle => {
- if *self.stop_signal.borrow() {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = tokio::time::sleep(Duration::from_secs(1)) => {
- // stay in Idle state
- }
- }
- } else {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = self.stop_signal.changed() => {
- // stay in Idle state
- }
+ select! {
+ new_st = self.worker.wait_for_work() => {
+ self.state = new_st;
}
+ _ = self.stop_signal.changed() => (),
}
}
WorkerState::Done => unreachable!(),
diff --git a/src/util/config.rs b/src/util/config.rs
index 04f8375a..f0a881aa 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -34,7 +34,11 @@ pub struct Config {
pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded
- pub rpc_secret: String,
+ /// Note: When using `read_config` this should never be `None`
+ pub rpc_secret: Option<String>,
+
+ /// Optional file where RPC secret key is read from
+ pub rpc_secret_file: Option<String>,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
@@ -177,7 +181,31 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut config = String::new();
file.read_to_string(&mut config)?;
- Ok(toml::from_str(&config)?)
+ let mut parsed_config: Config = toml::from_str(&config)?;
+
+ match (&parsed_config.rpc_secret, &parsed_config.rpc_secret_file) {
+ (Some(_), None) => {
+ // no-op
+ }
+ (Some(_), Some(_)) => {
+ return Err("only one of `rpc_secret` and `rpc_secret_file` can be set".into())
+ }
+ (None, Some(rpc_secret_file_path_string)) => {
+ let mut rpc_secret_file = std::fs::OpenOptions::new()
+ .read(true)
+ .open(rpc_secret_file_path_string)?;
+ let mut rpc_secret_from_file = String::new();
+ rpc_secret_file.read_to_string(&mut rpc_secret_from_file)?;
+ // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
+ // also editors sometimes add a trailing newline
+ parsed_config.rpc_secret = Some(String::from(rpc_secret_from_file.trim_end()));
+ }
+ (None, None) => {
+ return Err("either `rpc_secret` or `rpc_secret_file` needs to be set".into())
+ }
+ };
+
+ Ok(parsed_config)
}
fn default_compression() -> Option<i32> {
@@ -233,3 +261,123 @@ where
deserializer.deserialize_any(OptionVisitor)
}
+
+#[cfg(test)]
+mod tests {
+ use crate::error::Error;
+ use std::fs::File;
+ use std::io::Write;
+
+ #[test]
+ fn test_rpc_secret_is_required() -> Result<(), Error> {
+ let path1 = mktemp::Temp::new_file()?;
+ let mut file1 = File::create(path1.as_path())?;
+ writeln!(
+ file1,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ assert_eq!(
+ "either `rpc_secret` or `rpc_secret_file` needs to be set",
+ super::read_config(path1.to_path_buf())
+ .unwrap_err()
+ .to_string()
+ );
+ drop(path1);
+ drop(file1);
+
+ let path2 = mktemp::Temp::new_file()?;
+ let mut file2 = File::create(path2.as_path())?;
+ writeln!(
+ file2,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret = "foo"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+
+ let config = super::read_config(path2.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+ drop(path2);
+ drop(file2);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_rpc_secret_file_works() -> Result<(), Error> {
+ let path_secret = mktemp::Temp::new_file()?;
+ let mut file_secret = File::create(path_secret.as_path())?;
+ writeln!(file_secret, "foo")?;
+ drop(file_secret);
+
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ let path_secret_path = path_secret.as_path().display();
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret_file = "{path_secret_path}"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ let config = super::read_config(path_config.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+ drop(path_config);
+ drop(path_secret);
+ drop(file_config);
+ Ok(())
+ }
+
+ #[test]
+ fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret= "dummy"
+ rpc_secret_file = "dummy"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ assert_eq!(
+ "only one of `rpc_secret` and `rpc_secret_file` can be set",
+ super::read_config(path_config.to_path_buf())
+ .unwrap_err()
+ .to_string()
+ );
+ drop(path_config);
+ drop(file_config);
+ Ok(())
+ }
+}
diff --git a/src/util/data.rs b/src/util/data.rs
index 7715c2cc..bdd8daee 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -115,9 +115,9 @@ pub fn sha256sum(data: &[u8]) -> Hash {
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
- use blake2::{Blake2b, Digest};
+ use blake2::{Blake2b512, Digest};
- let mut hasher = Blake2b::new();
+ let mut hasher = Blake2b512::new();
hasher.update(data);
let mut hash = [0u8; 32];
hash.copy_from_slice(&hasher.finalize()[..32]);
@@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-
-// RMP serialization with names of fields and variants
-
-/// Serialize to MessagePack
-pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
-where
- T: Serialize + ?Sized,
-{
- let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
- val.serialize(&mut se)?;
- Ok(wr)
-}
-
-/// Serialize to JSON, truncating long result
-pub fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
- // (or more) codepoint
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
diff --git a/src/util/encode.rs b/src/util/encode.rs
new file mode 100644
index 00000000..1cd3198f
--- /dev/null
+++ b/src/util/encode.rs
@@ -0,0 +1,42 @@
+use serde::{Deserialize, Serialize};
+
+/// Serialize to MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}
+
+/// Deserialize from MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
+where
+ T: for<'de> Deserialize<'de> + ?Sized,
+{
+ rmp_serde::decode::from_read_ref::<_, T>(bytes)
+}
+
+/// Serialize to JSON, truncating long result
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index 9995c746..3fcee71d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,6 +7,7 @@ use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
+use crate::encode::debug_serialize;
/// Regroup all Garage errors
#[derive(Debug, Error)]
diff --git a/src/util/formater.rs b/src/util/formater.rs
index 95324f9a..2ea53ebb 100644
--- a/src/util/formater.rs
+++ b/src/util/formater.rs
@@ -1,4 +1,4 @@
-pub fn format_table(data: Vec<String>) {
+pub fn format_table_to_string(data: Vec<String>) -> String {
let data = data
.iter()
.map(|s| s.split('\t').collect::<Vec<_>>())
@@ -24,5 +24,9 @@ pub fn format_table(data: Vec<String>) {
out.push('\n');
}
- print!("{}", out);
+ out
+}
+
+pub fn format_table(data: Vec<String>) {
+ print!("{}", format_table_to_string(data));
}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..d35ca72f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,11 +8,12 @@ pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
+pub mod encode;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
-pub mod token_bucket;
pub mod tranquilizer;
pub mod version;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..1229fd9c
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,159 @@
+use serde::{Deserialize, Serialize};
+
+/// Indicates that this type has an encoding that can be migrated from
+/// a previous version upon upgrades of Garage.
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated.
+ type Previous: Migrate;
+
+ /// The migration function that transforms a value decoded in the old format
+ /// to an up-to-date value.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ /// Decode an encoded version of this type, going through a migration if necessary.
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ let marker_len = Self::VERSION_MARKER.len();
+ if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
+ if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ /// Encode this type with optionnal version marker
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::VERSION_MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+/// Indicates that this type has no previous encoding version to be migrated from.
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+}
+
+impl<T: InitialFormat> Migrate for T {
+ const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+/// Internal type used by InitialFormat, not meant for general use.
+#[derive(Serialize, Deserialize)]
+pub enum NoPrevious {}
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V1 {
+ a: usize,
+ b: String,
+ }
+ impl InitialFormat for V1 {}
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V2 {
+ a: usize,
+ b: Vec<String>,
+ c: String,
+ }
+ impl Migrate for V2 {
+ const VERSION_MARKER: &'static [u8] = b"GtestV2";
+ type Previous = V1;
+ fn migrate(prev: V1) -> V2 {
+ V2 {
+ a: prev.a,
+ b: vec![prev.b],
+ c: String::new(),
+ }
+ }
+ }
+
+ #[test]
+ fn test_v1() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ let y = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_v2() {
+ let x = V2 {
+ a: 12,
+ b: vec!["hello".into(), "world".into()],
+ c: "plop".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_migrate() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+
+ let xx = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, xx);
+
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(
+ y,
+ V2 {
+ a: 12,
+ b: vec!["hello".into()],
+ c: "".into(),
+ }
+ );
+
+ let y_enc = y.encode().unwrap();
+ assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+
+ let z = V2::decode(&y_enc).unwrap();
+ assert_eq!(y, z);
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..5c66bbed 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -1,23 +1,19 @@
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +23,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +72,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;
@@ -70,3 +85,36 @@ where
Ok(())
}
}
+
+pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
+
+impl<V: Migrate + Default> Clone for PersisterShared<V> {
+ fn clone(&self) -> PersisterShared<V> {
+ PersisterShared(self.0.clone())
+ }
+}
+
+impl<V: Migrate + Default> PersisterShared<V> {
+ pub fn new(base_dir: &Path, file_name: &str) -> Self {
+ let persister = Persister::new(base_dir, file_name);
+ let value = persister.load().unwrap_or_default();
+ Self(Arc::new((persister, RwLock::new(value))))
+ }
+
+ pub fn get_with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&V) -> R,
+ {
+ let value = self.0 .1.read().unwrap();
+ f(&value)
+ }
+
+ pub fn set_with<F>(&self, f: F) -> Result<(), Error>
+ where
+ F: FnOnce(&mut V),
+ {
+ let mut value = self.0 .1.write().unwrap();
+ f(&mut value);
+ self.0 .0.save(&value)
+ }
+}
diff --git a/src/util/time.rs b/src/util/time.rs
index 257b4d2a..42f41a44 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -25,6 +25,6 @@ pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 {
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
- let timestamp = Utc.timestamp(secs, nanos);
+ let timestamp = Utc.timestamp_opt(secs, nanos).unwrap();
timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
}
diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs
deleted file mode 100644
index cc0dfa1f..00000000
--- a/src/util/token_bucket.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::time::{Duration, Instant};
-
-use tokio::time::sleep;
-
-pub struct TokenBucket {
- // Replenish rate: number of tokens per second
- replenish_rate: u64,
- // Current number of tokens
- tokens: u64,
- // Last replenish time
- last_replenish: Instant,
-}
-
-impl TokenBucket {
- pub fn new(replenish_rate: u64) -> Self {
- Self {
- replenish_rate,
- tokens: 0,
- last_replenish: Instant::now(),
- }
- }
-
- pub async fn take(&mut self, tokens: u64) {
- while self.tokens < tokens {
- let needed = tokens - self.tokens;
- let delay = (needed as f64) / (self.replenish_rate as f64);
- sleep(Duration::from_secs_f64(delay)).await;
- self.replenish();
- }
- self.tokens -= tokens;
- }
-
- pub fn replenish(&mut self) {
- let now = Instant::now();
- let new_tokens =
- ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
- self.tokens += new_tokens;
- self.last_replenish = now;
- }
-}