diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 22 | ||||
-rw-r--r-- | src/util/async_hash.rs | 61 | ||||
-rw-r--r-- | src/util/background.rs | 153 | ||||
-rw-r--r-- | src/util/background/job_worker.rs | 48 | ||||
-rw-r--r-- | src/util/background/mod.rs | 117 | ||||
-rw-r--r-- | src/util/background/worker.rs | 260 | ||||
-rw-r--r-- | src/util/config.rs | 73 | ||||
-rw-r--r-- | src/util/crdt/bool.rs | 2 | ||||
-rw-r--r-- | src/util/crdt/deletable.rs | 2 | ||||
-rw-r--r-- | src/util/crdt/lww.rs | 2 | ||||
-rw-r--r-- | src/util/crdt/lww_map.rs | 7 | ||||
-rw-r--r-- | src/util/crdt/map.rs | 2 | ||||
-rw-r--r-- | src/util/error.rs | 15 | ||||
-rw-r--r-- | src/util/formater.rs | 28 | ||||
-rw-r--r-- | src/util/lib.rs | 4 | ||||
-rw-r--r-- | src/util/metrics.rs | 16 | ||||
-rw-r--r-- | src/util/sled_counter.rs | 100 | ||||
-rw-r--r-- | src/util/tranquilizer.rs | 27 | ||||
-rw-r--r-- | src/util/version.rs | 28 |
19 files changed, 650 insertions, 317 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index f13c1589..8e978fc2 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,15 +14,21 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } + +arc-swap = "1.0" +async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" +git-version = "0.3.4" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" +lazy_static = "1.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" - -sled = "0.34" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" @@ -33,11 +39,13 @@ toml = "0.5" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +netapp = "0.5" http = "0.2" hyper = "0.14" opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } + + +[features] +k2v = [] diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..5631ea6b --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,61 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher<D: Digest> { + sendblk: mpsc::Sender<Bytes>, + task: JoinHandle<digest::Output<D>>, +} + +impl<D: Digest> AsyncHasher<D> { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some(blk) = recvblk.blocking_recv() { + digest.update(&blk[..]); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub async fn update(&self, b: Bytes) { + self.sendblk.send(b).await.unwrap(); + } + + pub async fn finalize(self) -> digest::Output<D> { + drop(self.sendblk); + self.task.await.unwrap() + } +} + +impl<D: Digest> Default for AsyncHasher<D> { + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/background.rs b/src/util/background.rs deleted file mode 100644 index bfdaaf1e..00000000 --- a/src/util/background.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Job runner for futures and async functions -use core::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - -use futures::future::*; -use futures::select; -use tokio::sync::{mpsc, watch, Mutex}; - -use crate::error::Error; - -type JobOutput = Result<(), Error>; -type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; - -/// Job runner for futures and async functions -pub struct BackgroundRunner { - stop_signal: watch::Receiver<bool>, - queue_in: mpsc::UnboundedSender<(Job, bool)>, - worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, -} - -impl BackgroundRunner { - /// Create a new BackgroundRunner - pub fn new( - n_runners: usize, - stop_signal: watch::Receiver<bool>, - ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { - let (worker_in, mut worker_out) = mpsc::unbounded_channel(); - - let stop_signal_2 = stop_signal.clone(); - let await_all_done = tokio::spawn(async move { - loop { - let wkr = { - select! { - item = worker_out.recv().fuse() => { - match item { - Some(x) => x, - None => break, - } - } - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal_2.borrow() { - break; - } else { - continue; - } - } - } - }; - if let Err(e) = wkr.await { - error!("Error while awaiting for worker: {}", e); - } - } - }); - - let (queue_in, queue_out) = mpsc::unbounded_channel(); - let queue_out = Arc::new(Mutex::new(queue_out)); - - for i in 0..n_runners { - let queue_out = queue_out.clone(); - let stop_signal = stop_signal.clone(); - - worker_in - .send(tokio::spawn(async move { - loop { - let (job, cancellable) = { - select! { - item = wait_job(&queue_out).fuse() => match item { - // We received a task, process it - Some(x) => x, - // We received a signal that no more tasks will ever be sent - // because the sending side was dropped. Exit now. - None => break, - }, - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal.borrow() { - // Nothing has been going on for 5 secs, and we are shutting - // down. Exit now. - break; - } else { - // Nothing is going on but we don't want to exit. - continue; - } - } - } - }; - if cancellable && *stop_signal.borrow() { - continue; - } - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } - info!("Background worker {} exiting", i); - })) - .unwrap(); - } - - let bgrunner = Arc::new(Self { - stop_signal, - queue_in, - worker_in, - }); - (bgrunner, await_all_done) - } - - /// Spawn a task to be run in background - pub fn spawn<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, false)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - /// Spawn a task to be run in background. It may get discarded before running if spawned while - /// the runner is stopping - pub fn spawn_cancellable<T>(&self, job: T) - where - T: Future<Output = JobOutput> + Send + 'static, - { - let boxed: Job = Box::pin(job); - self.queue_in - .send((boxed, true)) - .map_err(|_| "could not put job in queue") - .unwrap(); - } - - pub fn spawn_worker<F, T>(&self, name: String, worker: F) - where - F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, - T: Future<Output = ()> + Send + 'static, - { - let stop_signal = self.stop_signal.clone(); - let task = tokio::spawn(async move { - info!("Worker started: {}", name); - worker(stop_signal).await; - info!("Worker exited: {}", name); - }); - self.worker_in - .send(task) - .map_err(|_| "could not put job in queue") - .unwrap(); - } -} - -async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> { - q.lock().await.recv().await -} diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs new file mode 100644 index 00000000..2568ea11 --- /dev/null +++ b/src/util/background/job_worker.rs @@ -0,0 +1,48 @@ +//! Job worker: a generic worker that just processes incoming +//! jobs one by one + +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::{mpsc, Mutex}; + +use crate::background::worker::*; +use crate::background::*; + +pub(crate) struct JobWorker { + pub(crate) index: usize, + pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>, + pub(crate) next_job: Option<Job>, +} + +#[async_trait] +impl Worker for JobWorker { + fn name(&self) -> String { + format!("Job worker #{}", self.index) + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + match self.next_job.take() { + None => return Ok(WorkerState::Idle), + Some(job) => { + job.await?; + Ok(WorkerState::Busy) + } + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + loop { + match self.job_chan.lock().await.recv().await { + Some((job, cancellable)) => { + if cancellable && *must_exit.borrow() { + continue; + } + self.next_job = Some(job); + return WorkerState::Busy; + } + None => return WorkerState::Done, + } + } + } +} diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs new file mode 100644 index 00000000..619f5068 --- /dev/null +++ b/src/util/background/mod.rs @@ -0,0 +1,117 @@ +//! Job runner for futures and async functions + +pub mod job_worker; +pub mod worker; + +use core::future::Future; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch, Mutex}; + +use crate::error::Error; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerState}; + +pub(crate) type JobOutput = Result<(), Error>; +pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; + +/// Job runner for futures and async functions +pub struct BackgroundRunner { + send_job: mpsc::UnboundedSender<(Job, bool)>, + send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct WorkerInfo { + pub name: String, + pub info: Option<String>, + pub state: WorkerState, + pub errors: usize, + pub consecutive_errors: usize, + pub last_error: Option<(String, u64)>, +} + +impl BackgroundRunner { + /// Create a new BackgroundRunner + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver<bool>, + ) -> (Arc<Self>, tokio::task::JoinHandle<()>) { + let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); + + let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut worker_processor = + WorkerProcessor::new(worker_out, stop_signal, worker_info.clone()); + + let await_all_done = tokio::spawn(async move { + worker_processor.run().await; + }); + + let (send_job, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); + } + + let bgrunner = Arc::new(Self { + send_job, + send_worker, + worker_info, + }); + (bgrunner, await_all_done) + } + + pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> { + self.worker_info.lock().unwrap().clone() + } + + /// Spawn a task to be run in background + pub fn spawn<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, false)) + .ok() + .expect("Could not put job in queue"); + } + + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping + pub fn spawn_cancellable<T>(&self, job: T) + where + T: Future<Output = JobOutput> + Send + 'static, + { + let boxed: Job = Box::pin(job); + self.send_job + .send((boxed, true)) + .ok() + .expect("Could not put job in queue"); + } + + pub fn spawn_worker<W>(&self, worker: W) + where + W: Worker + 'static, + { + self.send_worker + .send(Box::new(worker)) + .ok() + .expect("Could not put worker in queue"); + } +} diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs new file mode 100644 index 00000000..f5e3addb --- /dev/null +++ b/src/util/background/worker.rs @@ -0,0 +1,260 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use futures::future::*; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::select; +use tokio::sync::{mpsc, watch}; + +use crate::background::WorkerInfo; +use crate::error::Error; +use crate::time::now_msec; + +#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] +pub enum WorkerState { + Busy, + Throttled(f32), + Idle, + Done, +} + +impl std::fmt::Display for WorkerState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WorkerState::Busy => write!(f, "Busy"), + WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Idle => write!(f, "Idle"), + WorkerState::Done => write!(f, "Done"), + } + } +} + +#[async_trait] +pub trait Worker: Send { + fn name(&self) -> String; + + fn info(&self) -> Option<String> { + None + } + + /// Work: do a basic unit of work, if one is available (otherwise, should return + /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the + /// middle of processing, it will only be interrupted at the last minute when Garage is trying + /// to exit and this hasn't returned yet. This function may return an error to indicate that + /// its unit of work could not be processed due to an error: the error will be logged and + /// .work() will be called again after a short delay. + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>; + + /// Wait for work: await for some task to become available. This future can be interrupted in + /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we + /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows + /// it to check if we are exiting. + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState; +} + +pub(crate) struct WorkerProcessor { + stop_signal: watch::Receiver<bool>, + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, +} + +impl WorkerProcessor { + pub(crate) fn new( + worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>, + stop_signal: watch::Receiver<bool>, + worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>, + ) -> Self { + Self { + stop_signal, + worker_chan, + worker_info, + } + } + + pub(crate) async fn run(&mut self) { + let mut workers = FuturesUnordered::new(); + let mut next_task_id = 1; + + while !*self.stop_signal.borrow() { + let await_next_worker = async { + if workers.is_empty() { + futures::future::pending().await + } else { + workers.next().await + } + }; + select! { + new_worker_opt = self.worker_chan.recv() => { + if let Some(new_worker) = new_worker_opt { + let task_id = next_task_id; + next_task_id += 1; + let stop_signal = self.stop_signal.clone(); + let stop_signal_worker = self.stop_signal.clone(); + let mut worker = WorkerHandler { + task_id, + stop_signal, + stop_signal_worker, + worker: new_worker, + state: WorkerState::Busy, + errors: 0, + consecutive_errors: 0, + last_error: None, + }; + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + worker = await_next_worker => { + if let Some(mut worker) = worker { + trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state); + + // Save worker info + let mut wi = self.worker_info.lock().unwrap(); + match wi.get_mut(&worker.task_id) { + Some(i) => { + i.state = worker.state; + i.info = worker.worker.info(); + i.errors = worker.errors; + i.consecutive_errors = worker.consecutive_errors; + if worker.last_error.is_some() { + i.last_error = worker.last_error.take(); + } + } + None => { + wi.insert(worker.task_id, WorkerInfo { + name: worker.worker.name(), + state: worker.state, + info: worker.worker.info(), + errors: worker.errors, + consecutive_errors: worker.consecutive_errors, + last_error: worker.last_error.take(), + }); + } + } + + if worker.state == WorkerState::Done { + info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + } else { + workers.push(async move { + worker.step().await; + worker + }.boxed()); + } + } + } + _ = self.stop_signal.changed() => (), + } + } + + // We are exiting, drain everything + let drain_half_time = Instant::now() + Duration::from_secs(5); + let drain_everything = async move { + while let Some(mut worker) = workers.next().await { + if worker.state == WorkerState::Done { + info!( + "Worker {} (TID {}) exited", + worker.worker.name(), + worker.task_id + ); + } else if Instant::now() > drain_half_time { + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); + } else { + workers.push( + async move { + worker.step().await; + worker + } + .boxed(), + ); + } + } + }; + + select! { + _ = drain_everything => { + info!("All workers exited peacefully \\o/"); + } + _ = tokio::time::sleep(Duration::from_secs(9)) => { + error!("Some workers could not exit in time, we are cancelling some things in the middle"); + } + } + } +} + +struct WorkerHandler { + task_id: usize, + stop_signal: watch::Receiver<bool>, + stop_signal_worker: watch::Receiver<bool>, + worker: Box<dyn Worker>, + state: WorkerState, + errors: usize, + consecutive_errors: usize, + last_error: Option<(String, u64)>, +} + +impl WorkerHandler { + async fn step(&mut self) { + match self.state { + WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.state = s; + self.consecutive_errors = 0; + } + Err(e) => { + error!( + "Error in worker {} (TID {}): {}", + self.worker.name(), + self.task_id, + e + ); + self.errors += 1; + self.consecutive_errors += 1; + self.last_error = Some((format!("{}", e), now_msec())); + // Sleep a bit so that error won't repeat immediately, exponential backoff + // strategy (min 1sec, max ~60sec) + self.state = WorkerState::Throttled( + (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), + ); + } + }, + WorkerState::Throttled(delay) => { + // Sleep for given delay and go back to busy state + if !*self.stop_signal.borrow() { + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), + _ = self.stop_signal.changed() => (), + } + } + self.state = WorkerState::Busy; + } + WorkerState::Idle => { + if *self.stop_signal.borrow() { + select! { + new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { + self.state = new_st; + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // stay in Idle state + } + } + } else { + select! { + new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { + self.state = new_st; + } + _ = self.stop_signal.changed() => { + // stay in Idle state + } + } + } + } + WorkerState::Done => unreachable!(), + } + } +} diff --git a/src/util/config.rs b/src/util/config.rs index e4d96476..2d4b4f57 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -3,12 +3,8 @@ use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; -use serde::de::Error as SerdeError; use serde::{de, Deserialize}; -use netapp::util::parse_and_resolve_peer_addr; -use netapp::NodeID; - use crate::error::Error; /// Represent the whole configuration @@ -23,10 +19,6 @@ pub struct Config { #[serde(default = "default_block_size")] pub block_size: usize, - /// Size of data blocks to save to disk - #[serde(default = "default_block_manager_background_tranquility")] - pub block_manager_background_tranquility: u32, - /// Replication mode. Supported values: /// - none, 1 -> no replication /// - 2 -> 2-way replication @@ -47,11 +39,16 @@ pub struct Config { /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, /// Public IP address of this node - pub rpc_public_addr: Option<SocketAddr>, + pub rpc_public_addr: Option<String>, + + /// Timeout for Netapp's ping messagess + pub rpc_ping_timeout_msec: Option<u64>, + /// Timeout for Netapp RPC calls + pub rpc_timeout_msec: Option<u64>, /// Bootstrap peers RPC address - #[serde(deserialize_with = "deserialize_vec_addr", default)] - pub bootstrap_peers: Vec<(NodeID, SocketAddr)>, + #[serde(default)] + pub bootstrap_peers: Vec<String>, /// Consul host to connect to to discover more peers pub consul_host: Option<String>, /// Consul service name to use @@ -64,19 +61,27 @@ pub struct Config { #[serde(default)] pub kubernetes_skip_crd: bool, + // -- DB + /// Database engine to use for metadata (options: sled, sqlite, lmdb) + #[serde(default = "default_db_engine")] + pub db_engine: String, + /// Sled cache size, in bytes #[serde(default = "default_sled_cache_capacity")] pub sled_cache_capacity: u64, - /// Sled flush interval in milliseconds #[serde(default = "default_sled_flush_every_ms")] pub sled_flush_every_ms: u64, + // -- APIs /// Configuration for S3 api - pub s3_api: ApiConfig, + pub s3_api: S3ApiConfig, + + /// Configuration for K2V api + pub k2v_api: Option<K2VApiConfig>, /// Configuration for serving files as normal web server - pub s3_web: WebConfig, + pub s3_web: Option<WebConfig>, /// Configuration for the admin API endpoint #[serde(default = "Default::default")] @@ -85,9 +90,9 @@ pub struct Config { /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] -pub struct ApiConfig { +pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: Option<SocketAddr>, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -95,6 +100,13 @@ pub struct ApiConfig { pub root_domain: Option<String>, } +/// Configuration for K2V api +#[derive(Deserialize, Debug, Clone)] +pub struct K2VApiConfig { + /// Address and port to bind for api serving + pub api_bind_addr: SocketAddr, +} + /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { @@ -109,10 +121,18 @@ pub struct WebConfig { pub struct AdminConfig { /// Address and port to bind for admin API serving pub api_bind_addr: Option<SocketAddr>, + /// Bearer token to use to scrape metrics + pub metrics_token: Option<String>, + /// Bearer token to use to access Admin API endpoints + pub admin_token: Option<String>, /// OTLP server to where to export traces pub trace_sink: Option<String>, } +fn default_db_engine() -> String { + "sled".into() +} + fn default_sled_cache_capacity() -> u64 { 128 * 1024 * 1024 } @@ -122,9 +142,6 @@ fn default_sled_flush_every_ms() -> u64 { fn default_block_size() -> usize { 1048576 } -fn default_block_manager_background_tranquility() -> u32 { - 2 -} /// Read and parse configuration pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { @@ -138,24 +155,6 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { Ok(toml::from_str(&config)?) } -fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAddr)>, D::Error> -where - D: de::Deserializer<'de>, -{ - let mut ret = vec![]; - - for peer in <Vec<&str>>::deserialize(deserializer)? { - let (pubkey, addrs) = parse_and_resolve_peer_addr(peer).ok_or_else(|| { - D::Error::custom(format!("Unable to parse or resolve peer: {}", peer)) - })?; - for ip in addrs { - ret.push((pubkey, ip)); - } - } - - Ok(ret) -} - fn default_compression() -> Option<i32> { Some(1) } diff --git a/src/util/crdt/bool.rs b/src/util/crdt/bool.rs index 53af8f82..111eb5f1 100644 --- a/src/util/crdt/bool.rs +++ b/src/util/crdt/bool.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::crdt::crdt::*; /// Boolean, where `true` is an absorbing state -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct Bool(bool); impl Bool { diff --git a/src/util/crdt/deletable.rs b/src/util/crdt/deletable.rs index c76f5cbb..e771aceb 100644 --- a/src/util/crdt/deletable.rs +++ b/src/util/crdt/deletable.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::crdt::crdt::*; /// Deletable object (once deleted, cannot go back) -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Deletable<T> { Present(T), Deleted, diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs index 254abe8e..958844c9 100644 --- a/src/util/crdt/lww.rs +++ b/src/util/crdt/lww.rs @@ -37,7 +37,7 @@ use crate::crdt::crdt::*; /// /// This scheme is used by AWS S3 or Soundcloud and often without knowing /// in enterprise when reconciliating databases with ad-hoc scripts. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct Lww<T> { ts: u64, v: T, diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs index c155c3a8..88113856 100644 --- a/src/util/crdt/lww_map.rs +++ b/src/util/crdt/lww_map.rs @@ -23,7 +23,7 @@ use crate::crdt::crdt::*; /// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, /// the serialization cost `O(n)` would still have to be paid at each modification, so we are /// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct LwwMap<K, V> { vals: Vec<(K, u64, V)>, } @@ -140,6 +140,11 @@ where self.vals.clear(); } + /// Retain only values that match a certain predicate + pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) { + self.vals.retain(pred); + } + /// Get a reference to the value assigned to a key pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { diff --git a/src/util/crdt/map.rs b/src/util/crdt/map.rs index f9ed19b6..5d1e1520 100644 --- a/src/util/crdt/map.rs +++ b/src/util/crdt/map.rs @@ -16,7 +16,7 @@ use crate::crdt::crdt::*; /// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, /// the serialization cost `O(n)` would still have to be paid at each modification, so we are /// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct Map<K, V> { vals: Vec<(K, V)>, } diff --git a/src/util/error.rs b/src/util/error.rs index bdb3a69b..9995c746 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -26,8 +26,8 @@ pub enum Error { #[error(display = "Netapp error: {}", _0)] Netapp(#[error(source)] netapp::error::Error), - #[error(display = "Sled error: {}", _0)] - Sled(#[error(source)] sled::Error), + #[error(display = "DB error: {}", _0)] + Db(#[error(source)] garage_db::Error), #[error(display = "Messagepack encode error: {}", _0)] RmpEncode(#[error(source)] rmp_serde::encode::Error), @@ -44,6 +44,9 @@ pub enum Error { #[error(display = "Tokio semaphore acquire error: {}", _0)] TokioSemAcquire(#[error(source)] tokio::sync::AcquireError), + #[error(display = "Tokio broadcast receive error: {}", _0)] + TokioBcastRecv(#[error(source)] tokio::sync::broadcast::error::RecvError), + #[error(display = "Remote error: {}", _0)] RemoteError(String), @@ -75,11 +78,11 @@ impl Error { } } -impl From<sled::transaction::TransactionError<Error>> for Error { - fn from(e: sled::transaction::TransactionError<Error>) -> Error { +impl From<garage_db::TxError<Error>> for Error { + fn from(e: garage_db::TxError<Error>) -> Error { match e { - sled::transaction::TransactionError::Abort(x) => x, - sled::transaction::TransactionError::Storage(x) => Error::Sled(x), + garage_db::TxError::Abort(x) => x, + garage_db::TxError::Db(x) => Error::Db(x), } } } diff --git a/src/util/formater.rs b/src/util/formater.rs new file mode 100644 index 00000000..95324f9a --- /dev/null +++ b/src/util/formater.rs @@ -0,0 +1,28 @@ +pub fn format_table(data: Vec<String>) { + let data = data + .iter() + .map(|s| s.split('\t').collect::<Vec<_>>()) + .collect::<Vec<_>>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} diff --git a/src/util/lib.rs b/src/util/lib.rs index e83fc2e6..264cc192 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,14 +3,16 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt; pub mod data; pub mod error; +pub mod formater; pub mod metrics; pub mod persister; -pub mod sled_counter; pub mod time; pub mod token_bucket; pub mod tranquilizer; +pub mod version; diff --git a/src/util/metrics.rs b/src/util/metrics.rs index 1b05eabe..b882a886 100644 --- a/src/util/metrics.rs +++ b/src/util/metrics.rs @@ -1,4 +1,4 @@ -use std::time::SystemTime; +use std::time::Instant; use futures::{future::BoxFuture, Future, FutureExt}; use rand::Rng; @@ -28,10 +28,12 @@ where attributes: &'a [KeyValue], ) -> BoxFuture<'a, Self::Output> { async move { - let request_start = SystemTime::now(); + let request_start = Instant::now(); let res = self.await; r.record( - request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()), + Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(), attributes, ); res @@ -41,9 +43,13 @@ where fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> { async move { - let request_start = SystemTime::now(); + let request_start = Instant::now(); let res = self.await; - r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64())); + r.record( + Instant::now() + .saturating_duration_since(request_start) + .as_secs_f64(), + ); res } .boxed() diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs deleted file mode 100644 index bc54cea0..00000000 --- a/src/util/sled_counter.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; - -use sled::{CompareAndSwapError, IVec, Iter, Result, Tree}; - -#[derive(Clone)] -pub struct SledCountedTree(Arc<SledCountedTreeInternal>); - -struct SledCountedTreeInternal { - tree: Tree, - len: AtomicUsize, -} - -impl SledCountedTree { - pub fn new(tree: Tree) -> Self { - let len = tree.len(); - Self(Arc::new(SledCountedTreeInternal { - tree, - len: AtomicUsize::new(len), - })) - } - - pub fn len(&self) -> usize { - self.0.len.load(Ordering::Relaxed) - } - - pub fn is_empty(&self) -> bool { - self.0.tree.is_empty() - } - - pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { - self.0.tree.get(key) - } - - pub fn iter(&self) -> Iter { - self.0.tree.iter() - } - - // ---- writing functions ---- - - pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>> - where - K: AsRef<[u8]>, - V: Into<IVec>, - { - let res = self.0.tree.insert(key, value); - if res == Ok(None) { - self.0.len.fetch_add(1, Ordering::Relaxed); - } - res - } - - pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> { - let res = self.0.tree.remove(key); - if matches!(res, Ok(Some(_))) { - self.0.len.fetch_sub(1, Ordering::Relaxed); - } - res - } - - pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { - let res = self.0.tree.pop_min(); - if let Ok(Some(_)) = &res { - self.0.len.fetch_sub(1, Ordering::Relaxed); - }; - res - } - - pub fn compare_and_swap<K, OV, NV>( - &self, - key: K, - old: Option<OV>, - new: Option<NV>, - ) -> Result<std::result::Result<(), CompareAndSwapError>> - where - K: AsRef<[u8]>, - OV: AsRef<[u8]>, - NV: Into<IVec>, - { - let old_some = old.is_some(); - let new_some = new.is_some(); - - let res = self.0.tree.compare_and_swap(key, old, new); - - if res == Ok(Ok(())) { - match (old_some, new_some) { - (false, true) => { - self.0.len.fetch_add(1, Ordering::Relaxed); - } - (true, false) => { - self.0.len.fetch_sub(1, Ordering::Relaxed); - } - _ => (), - } - } - res - } -} diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index 28711387..8a96cbb3 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -3,6 +3,8 @@ use std::time::{Duration, Instant}; use tokio::time::sleep; +use crate::background::WorkerState; + /// A tranquilizer is a helper object that is used to make /// background operations not take up too much time. /// @@ -33,8 +35,8 @@ impl Tranquilizer { } } - pub async fn tranquilize(&mut self, tranquility: u32) { - let observation = Instant::now() - self.last_step_begin; + fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> { + let observation = Instant::now().saturating_duration_since(self.last_step_begin); self.observations.push_back(observation); self.sum_observations += observation; @@ -45,13 +47,32 @@ impl Tranquilizer { if !self.observations.is_empty() { let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32); + Some(delay) + } else { + None + } + } + + pub async fn tranquilize(&mut self, tranquility: u32) { + if let Some(delay) = self.tranquilize_internal(tranquility) { sleep(delay).await; + self.reset(); } + } - self.reset(); + #[must_use] + pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState { + match self.tranquilize_internal(tranquility) { + Some(delay) => WorkerState::Throttled(delay.as_secs_f32()), + None => WorkerState::Busy, + } } pub fn reset(&mut self) { self.last_step_begin = Instant::now(); } + + pub fn clear(&mut self) { + self.observations.clear(); + } } diff --git a/src/util/version.rs b/src/util/version.rs new file mode 100644 index 00000000..b515dccc --- /dev/null +++ b/src/util/version.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; + +use arc_swap::{ArcSwap, ArcSwapOption}; + +lazy_static::lazy_static! { + static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + ))); + static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None); +} + +pub fn garage_version() -> &'static str { + &VERSION.load() +} + +pub fn garage_features() -> Option<&'static [&'static str]> { + FEATURES.load().as_ref().map(|f| &f[..]) +} + +pub fn init_version(version: &'static str) { + VERSION.store(Arc::new(version)); +} + +pub fn init_features(features: &'static [&'static str]) { + FEATURES.store(Some(Arc::new(features))); +} |