diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 1 | ||||
-rw-r--r-- | src/util/background/job_worker.rs | 48 | ||||
-rw-r--r-- | src/util/background/mod.rs | 58 | ||||
-rw-r--r-- | src/util/background/worker.rs | 73 | ||||
-rw-r--r-- | src/util/data.rs | 31 | ||||
-rw-r--r-- | src/util/encode.rs | 42 | ||||
-rw-r--r-- | src/util/error.rs | 1 | ||||
-rw-r--r-- | src/util/lib.rs | 2 | ||||
-rw-r--r-- | src/util/migrate.rs | 159 | ||||
-rw-r--r-- | src/util/persister.rs | 38 |
10 files changed, 256 insertions, 197 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 11640027..32e9c851 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1.0" digest = "0.10" err-derive = "0.3" git-version = "0.3.4" +hexdump = "0.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" lazy_static = "1.4" diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs deleted file mode 100644 index 2568ea11..00000000 --- a/src/util/background/job_worker.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Job worker: a generic worker that just processes incoming -//! jobs one by one - -use std::sync::Arc; - -use async_trait::async_trait; -use tokio::sync::{mpsc, Mutex}; - -use crate::background::worker::*; -use crate::background::*; - -pub(crate) struct JobWorker { - pub(crate) index: usize, - pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>, - pub(crate) next_job: Option<Job>, -} - -#[async_trait] -impl Worker for JobWorker { - fn name(&self) -> String { - format!("Job worker #{}", self.index) - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - match self.next_job.take() { - None => return Ok(WorkerState::Idle), - Some(job) => { - job.await?; - Ok(WorkerState::Busy) - } - } - } - - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - loop { - match self.job_chan.lock().await.recv().await { - Some((job, cancellable)) => { - if cancellable && *must_exit.borrow() { - continue; - } - self.next_job = Some(job); - return WorkerState::Busy; - } - None => return WorkerState::Done, - } - } - } -} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index fd9258b8..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,27 +1,18 @@ //! Job runner for futures and async functions -pub mod job_worker; pub mod worker; -use core::future::Future; - use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch, Mutex}; +use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; -pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; - /// Job runner for futures and async functions pub struct BackgroundRunner { - send_job: mpsc::UnboundedSender<(Job, bool)>, send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, } @@ -49,10 +40,7 @@ pub struct WorkerStatus { impl BackgroundRunner { /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver<bool>, - ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) { let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -63,24 +51,7 @@ impl BackgroundRunner { worker_processor.run().await; }); - let (send_job, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - - send_worker - .send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })) - .ok() - .unwrap(); - } - let bgrunner = Arc::new(Self { - send_job, send_worker, worker_info, }); @@ -91,31 +62,6 @@ impl BackgroundRunner { self.worker_info.lock().unwrap().clone() } - /// Spawn a task to be run in background - pub fn spawn<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, false)) - .ok() - .expect("Could not put job in queue"); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.send_job - .send((boxed, true)) - .ok() - .expect("Could not put job in queue"); - } - pub fn spawn_worker<W>(&self, worker: W) where W: Worker + 'static, diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 7e9da7f8..8165e2cb 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use futures::future::*; @@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; +// All workers that haven't exited for this time after an exit signal was recieved +// will be interrupted in the middle of whatever they are doing. +const EXIT_DEADLINE: Duration = Duration::from_secs(8); + #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] pub enum WorkerState { Busy, @@ -50,10 +54,8 @@ pub trait Worker: Send { async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>; /// Wait for work: await for some task to become available. This future can be interrupted in - /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we - /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows - /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState; + /// the middle for any reason, for example if an interrupt signal was recieved. + async fn wait_for_work(&mut self) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -93,11 +95,9 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); - let stop_signal_worker = self.stop_signal.clone(); let mut worker = WorkerHandler { task_id, stop_signal, - stop_signal_worker, worker: new_worker, state: WorkerState::Busy, errors: 0, @@ -153,26 +153,14 @@ impl WorkerProcessor { } // We are exiting, drain everything - let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { - while let Some(mut worker) = workers.next().await { - if worker.state == WorkerState::Done { - info!( - "Worker {} (TID {}) exited", - worker.worker.name(), - worker.task_id - ); - } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); - } else { - workers.push( - async move { - worker.step().await; - worker - } - .boxed(), - ); - } + while let Some(worker) = workers.next().await { + info!( + "Worker {} (TID {}) exited (last state: {:?})", + worker.worker.name(), + worker.task_id, + worker.state + ); } }; @@ -180,7 +168,7 @@ impl WorkerProcessor { _ = drain_everything => { info!("All workers exited peacefully \\o/"); } - _ = tokio::time::sleep(Duration::from_secs(9)) => { + _ = tokio::time::sleep(EXIT_DEADLINE) => { error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } @@ -190,7 +178,6 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver<bool>, - stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, state: WorkerState, errors: usize, @@ -225,33 +212,19 @@ impl WorkerHandler { }, WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state - if !*self.stop_signal.borrow() { - select! { - _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), - _ = self.stop_signal.changed() => (), + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => { + self.state = WorkerState::Busy; } + _ = self.stop_signal.changed() => (), } - self.state = WorkerState::Busy; } WorkerState::Idle => { - if *self.stop_signal.borrow() { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // stay in Idle state - } - } - } else { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = self.stop_signal.changed() => { - // stay in Idle state - } + select! { + new_st = self.worker.wait_for_work() => { + self.state = new_st; } + _ = self.stop_signal.changed() => (), } } WorkerState::Done => unreachable!(), diff --git a/src/util/data.rs b/src/util/data.rs index 7715c2cc..3f61e301 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash { pub fn gen_uuid() -> Uuid { rand::thread_rng().gen::<[u8; 32]>().into() } - -// RMP serialization with names of fields and variants - -/// Serialize to MessagePack -pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> -where - T: Serialize + ?Sized, -{ - let mut wr = Vec::with_capacity(128); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); - val.serialize(&mut se)?; - Ok(wr) -} - -/// Serialize to JSON, truncating long result -pub fn debug_serialize<T: Serialize>(x: T) -> String { - match serde_json::to_string(&x) { - Ok(ss) => { - if ss.len() > 100 { - // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes - // (or more) codepoint - ss[..100].to_string() - } else { - ss - } - } - Err(e) => format!("<JSON serialization error: {}>", e), - } -} diff --git a/src/util/encode.rs b/src/util/encode.rs new file mode 100644 index 00000000..1cd3198f --- /dev/null +++ b/src/util/encode.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; + +/// Serialize to MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) +} + +/// Deserialize from MessagePacki, without versionning +/// (see garage_util::migrate for functions that manage versionned +/// data formats) +pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error> +where + T: for<'de> Deserialize<'de> + ?Sized, +{ + rmp_serde::decode::from_read_ref::<_, T>(bytes) +} + +/// Serialize to JSON, truncating long result +pub fn debug_serialize<T: Serialize>(x: T) -> String { + match serde_json::to_string(&x) { + Ok(ss) => { + if ss.len() > 100 { + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint + ss[..100].to_string() + } else { + ss + } + } + Err(e) => format!("<JSON serialization error: {}>", e), + } +} diff --git a/src/util/error.rs b/src/util/error.rs index 9995c746..3fcee71d 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -7,6 +7,7 @@ use err_derive::Error; use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use crate::data::*; +use crate::encode::debug_serialize; /// Regroup all Garage errors #[derive(Debug, Error)] diff --git a/src/util/lib.rs b/src/util/lib.rs index 264cc192..be82061f 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -8,9 +8,11 @@ pub mod background; pub mod config; pub mod crdt; pub mod data; +pub mod encode; pub mod error; pub mod formater; pub mod metrics; +pub mod migrate; pub mod persister; pub mod time; pub mod token_bucket; diff --git a/src/util/migrate.rs b/src/util/migrate.rs new file mode 100644 index 00000000..1229fd9c --- /dev/null +++ b/src/util/migrate.rs @@ -0,0 +1,159 @@ +use serde::{Deserialize, Serialize}; + +/// Indicates that this type has an encoding that can be migrated from +/// a previous version upon upgrades of Garage. +pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const VERSION_MARKER: &'static [u8] = b""; + + /// The previous version of this data type, from which items of this version + /// can be migrated. + type Previous: Migrate; + + /// The migration function that transforms a value decoded in the old format + /// to an up-to-date value. + fn migrate(previous: Self::Previous) -> Self; + + /// Decode an encoded version of this type, going through a migration if necessary. + fn decode(bytes: &[u8]) -> Option<Self> { + let marker_len = Self::VERSION_MARKER.len(); + if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) { + if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) { + return Some(value); + } + } + + Self::Previous::decode(bytes).map(Self::migrate) + } + + /// Encode this type with optionnal version marker + fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { + let mut wr = Vec::with_capacity(128); + wr.extend_from_slice(Self::VERSION_MARKER); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + self.serialize(&mut se)?; + Ok(wr) + } +} + +/// Indicates that this type has no previous encoding version to be migrated from. +pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static { + /// A sequence of bytes to add at the beginning of the serialized + /// string, to identify that the data is of this version. + const VERSION_MARKER: &'static [u8] = b""; +} + +impl<T: InitialFormat> Migrate for T { + const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER; + + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } +} + +/// Internal type used by InitialFormat, not meant for general use. +#[derive(Serialize, Deserialize)] +pub enum NoPrevious {} + +impl Migrate for NoPrevious { + type Previous = NoPrevious; + + fn migrate(_previous: Self::Previous) -> Self { + unreachable!(); + } + + fn decode(_bytes: &[u8]) -> Option<Self> { + None + } + + fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { + unreachable!() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct V1 { + a: usize, + b: String, + } + impl InitialFormat for V1 {} + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct V2 { + a: usize, + b: Vec<String>, + c: String, + } + impl Migrate for V2 { + const VERSION_MARKER: &'static [u8] = b"GtestV2"; + type Previous = V1; + fn migrate(prev: V1) -> V2 { + V2 { + a: prev.a, + b: vec![prev.b], + c: String::new(), + } + } + } + + #[test] + fn test_v1() { + let x = V1 { + a: 12, + b: "hello".into(), + }; + let x_enc = x.encode().unwrap(); + let y = V1::decode(&x_enc).unwrap(); + assert_eq!(x, y); + } + + #[test] + fn test_v2() { + let x = V2 { + a: 12, + b: vec!["hello".into(), "world".into()], + c: "plop".into(), + }; + let x_enc = x.encode().unwrap(); + assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER); + let y = V2::decode(&x_enc).unwrap(); + assert_eq!(x, y); + } + + #[test] + fn test_migrate() { + let x = V1 { + a: 12, + b: "hello".into(), + }; + let x_enc = x.encode().unwrap(); + + let xx = V1::decode(&x_enc).unwrap(); + assert_eq!(x, xx); + + let y = V2::decode(&x_enc).unwrap(); + assert_eq!( + y, + V2 { + a: 12, + b: vec!["hello".into()], + c: "".into(), + } + ); + + let y_enc = y.encode().unwrap(); + assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER); + + let z = V2::decode(&y_enc).unwrap(); + assert_eq!(y, z); + } +} diff --git a/src/util/persister.rs b/src/util/persister.rs index 9e1a1910..4b9adf51 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -3,21 +3,16 @@ use std::path::{Path, PathBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use serde::{Deserialize, Serialize}; - -use crate::data::*; use crate::error::Error; +use crate::migrate::Migrate; -pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> { +pub struct Persister<T: Migrate> { path: PathBuf, _marker: std::marker::PhantomData<T>, } -impl<T> Persister<T> -where - T: Serialize + for<'de> Deserialize<'de>, -{ +impl<T: Migrate> Persister<T> { pub fn new(base_dir: &Path, file_name: &str) -> Self { let mut path = base_dir.to_path_buf(); path.push(file_name); @@ -27,18 +22,37 @@ where } } + fn decode(&self, bytes: &[u8]) -> Result<T, Error> { + match T::decode(bytes) { + Some(v) => Ok(v), + None => { + error!( + "Unable to decode persisted data file {}", + self.path.display() + ); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(Error::Message(format!( + "Unable to decode persisted data file {}", + self.path.display() + ))) + } + } + } + pub fn load(&self) -> Result<T, Error> { let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?; let mut bytes = vec![]; file.read_to_end(&mut bytes)?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub fn save(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = std::fs::OpenOptions::new() .write(true) @@ -57,12 +71,12 @@ where let mut bytes = vec![]; file.read_to_end(&mut bytes).await?; - let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + let value = self.decode(&bytes[..])?; Ok(value) } pub async fn save_async(&self, t: &T) -> Result<(), Error> { - let bytes = rmp_to_vec_all_named(t)?; + let bytes = t.encode()?; let mut file = tokio::fs::File::create(&self.path).await?; file.write_all(&bytes[..]).await?; |