aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs58
-rw-r--r--src/util/background/worker.rs73
-rw-r--r--src/util/data.rs31
-rw-r--r--src/util/encode.rs42
-rw-r--r--src/util/error.rs1
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/migrate.rs159
-rw-r--r--src/util/persister.rs38
10 files changed, 256 insertions, 197 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 11640027..32e9c851 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
deleted file mode 100644
index 2568ea11..00000000
--- a/src/util/background/job_worker.rs
+++ /dev/null
@@ -1,48 +0,0 @@
-//! Job worker: a generic worker that just processes incoming
-//! jobs one by one
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use tokio::sync::{mpsc, Mutex};
-
-use crate::background::worker::*;
-use crate::background::*;
-
-pub(crate) struct JobWorker {
- pub(crate) index: usize,
- pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
- pub(crate) next_job: Option<Job>,
-}
-
-#[async_trait]
-impl Worker for JobWorker {
- fn name(&self) -> String {
- format!("Job worker #{}", self.index)
- }
-
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- match self.next_job.take() {
- None => return Ok(WorkerState::Idle),
- Some(job) => {
- job.await?;
- Ok(WorkerState::Busy)
- }
- }
- }
-
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- loop {
- match self.job_chan.lock().await.recv().await {
- Some((job, cancellable)) => {
- if cancellable && *must_exit.borrow() {
- continue;
- }
- self.next_job = Some(job);
- return WorkerState::Busy;
- }
- None => return WorkerState::Done,
- }
- }
- }
-}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index fd9258b8..41b48e93 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,27 +1,18 @@
//! Job runner for futures and async functions
-pub mod job_worker;
pub mod worker;
-use core::future::Future;
-
use std::collections::HashMap;
-use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch, Mutex};
+use tokio::sync::{mpsc, watch};
-use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
-pub(crate) type JobOutput = Result<(), Error>;
-pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
/// Job runner for futures and async functions
pub struct BackgroundRunner {
- send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
@@ -49,10 +40,7 @@ pub struct WorkerStatus {
impl BackgroundRunner {
/// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
@@ -63,24 +51,7 @@ impl BackgroundRunner {
worker_processor.run().await;
});
- let (send_job, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
-
- send_worker
- .send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- }))
- .ok()
- .unwrap();
- }
-
let bgrunner = Arc::new(Self {
- send_job,
send_worker,
worker_info,
});
@@ -91,31 +62,6 @@ impl BackgroundRunner {
self.worker_info.lock().unwrap().clone()
}
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, false))
- .ok()
- .expect("Could not put job in queue");
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, true))
- .ok()
- .expect("Could not put job in queue");
- }
-
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 7e9da7f8..8165e2cb 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use async_trait::async_trait;
use futures::future::*;
@@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
+// All workers that haven't exited for this time after an exit signal was recieved
+// will be interrupted in the middle of whatever they are doing.
+const EXIT_DEADLINE: Duration = Duration::from_secs(8);
+
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerState {
Busy,
@@ -50,10 +54,8 @@ pub trait Worker: Send {
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
- /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
- /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
- /// it to check if we are exiting.
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+ /// the middle for any reason, for example if an interrupt signal was recieved.
+ async fn wait_for_work(&mut self) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
@@ -93,11 +95,9 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
- let stop_signal_worker = self.stop_signal.clone();
let mut worker = WorkerHandler {
task_id,
stop_signal,
- stop_signal_worker,
worker: new_worker,
state: WorkerState::Busy,
errors: 0,
@@ -153,26 +153,14 @@ impl WorkerProcessor {
}
// We are exiting, drain everything
- let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
- while let Some(mut worker) = workers.next().await {
- if worker.state == WorkerState::Done {
- info!(
- "Worker {} (TID {}) exited",
- worker.worker.name(),
- worker.task_id
- );
- } else if Instant::now() > drain_half_time {
- warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
- } else {
- workers.push(
- async move {
- worker.step().await;
- worker
- }
- .boxed(),
- );
- }
+ while let Some(worker) = workers.next().await {
+ info!(
+ "Worker {} (TID {}) exited (last state: {:?})",
+ worker.worker.name(),
+ worker.task_id,
+ worker.state
+ );
}
};
@@ -180,7 +168,7 @@ impl WorkerProcessor {
_ = drain_everything => {
info!("All workers exited peacefully \\o/");
}
- _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ _ = tokio::time::sleep(EXIT_DEADLINE) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
@@ -190,7 +178,6 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
- stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
state: WorkerState,
errors: usize,
@@ -225,33 +212,19 @@ impl WorkerHandler {
},
WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
- if !*self.stop_signal.borrow() {
- select! {
- _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
- _ = self.stop_signal.changed() => (),
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => {
+ self.state = WorkerState::Busy;
}
+ _ = self.stop_signal.changed() => (),
}
- self.state = WorkerState::Busy;
}
WorkerState::Idle => {
- if *self.stop_signal.borrow() {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = tokio::time::sleep(Duration::from_secs(1)) => {
- // stay in Idle state
- }
- }
- } else {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = self.stop_signal.changed() => {
- // stay in Idle state
- }
+ select! {
+ new_st = self.worker.wait_for_work() => {
+ self.state = new_st;
}
+ _ = self.stop_signal.changed() => (),
}
}
WorkerState::Done => unreachable!(),
diff --git a/src/util/data.rs b/src/util/data.rs
index 7715c2cc..3f61e301 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-
-// RMP serialization with names of fields and variants
-
-/// Serialize to MessagePack
-pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
-where
- T: Serialize + ?Sized,
-{
- let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
- val.serialize(&mut se)?;
- Ok(wr)
-}
-
-/// Serialize to JSON, truncating long result
-pub fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
- // (or more) codepoint
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
diff --git a/src/util/encode.rs b/src/util/encode.rs
new file mode 100644
index 00000000..1cd3198f
--- /dev/null
+++ b/src/util/encode.rs
@@ -0,0 +1,42 @@
+use serde::{Deserialize, Serialize};
+
+/// Serialize to MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}
+
+/// Deserialize from MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
+where
+ T: for<'de> Deserialize<'de> + ?Sized,
+{
+ rmp_serde::decode::from_read_ref::<_, T>(bytes)
+}
+
+/// Serialize to JSON, truncating long result
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index 9995c746..3fcee71d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,6 +7,7 @@ use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
+use crate::encode::debug_serialize;
/// Regroup all Garage errors
#[derive(Debug, Error)]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..be82061f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,9 +8,11 @@ pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
+pub mod encode;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..1229fd9c
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,159 @@
+use serde::{Deserialize, Serialize};
+
+/// Indicates that this type has an encoding that can be migrated from
+/// a previous version upon upgrades of Garage.
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated.
+ type Previous: Migrate;
+
+ /// The migration function that transforms a value decoded in the old format
+ /// to an up-to-date value.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ /// Decode an encoded version of this type, going through a migration if necessary.
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ let marker_len = Self::VERSION_MARKER.len();
+ if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
+ if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ /// Encode this type with optionnal version marker
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::VERSION_MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+/// Indicates that this type has no previous encoding version to be migrated from.
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+}
+
+impl<T: InitialFormat> Migrate for T {
+ const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+/// Internal type used by InitialFormat, not meant for general use.
+#[derive(Serialize, Deserialize)]
+pub enum NoPrevious {}
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V1 {
+ a: usize,
+ b: String,
+ }
+ impl InitialFormat for V1 {}
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V2 {
+ a: usize,
+ b: Vec<String>,
+ c: String,
+ }
+ impl Migrate for V2 {
+ const VERSION_MARKER: &'static [u8] = b"GtestV2";
+ type Previous = V1;
+ fn migrate(prev: V1) -> V2 {
+ V2 {
+ a: prev.a,
+ b: vec![prev.b],
+ c: String::new(),
+ }
+ }
+ }
+
+ #[test]
+ fn test_v1() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ let y = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_v2() {
+ let x = V2 {
+ a: 12,
+ b: vec!["hello".into(), "world".into()],
+ c: "plop".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_migrate() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+
+ let xx = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, xx);
+
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(
+ y,
+ V2 {
+ a: 12,
+ b: vec!["hello".into()],
+ c: "".into(),
+ }
+ );
+
+ let y_enc = y.encode().unwrap();
+ assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+
+ let z = V2::decode(&y_enc).unwrap();
+ assert_eq!(y, z);
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..4b9adf51 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +22,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +71,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;