aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/util
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml22
-rw-r--r--src/util/async_hash.rs61
-rw-r--r--src/util/background.rs153
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs117
-rw-r--r--src/util/background/worker.rs260
-rw-r--r--src/util/config.rs73
-rw-r--r--src/util/crdt/bool.rs2
-rw-r--r--src/util/crdt/deletable.rs2
-rw-r--r--src/util/crdt/lww.rs2
-rw-r--r--src/util/crdt/lww_map.rs7
-rw-r--r--src/util/crdt/map.rs2
-rw-r--r--src/util/error.rs15
-rw-r--r--src/util/formater.rs28
-rw-r--r--src/util/lib.rs4
-rw-r--r--src/util/metrics.rs16
-rw-r--r--src/util/sled_counter.rs100
-rw-r--r--src/util/tranquilizer.rs27
-rw-r--r--src/util/version.rs28
19 files changed, 650 insertions, 317 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index f13c1589..8e978fc2 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.7.0"
+version = "0.8.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,15 +14,21 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
+
+arc-swap = "1.0"
+async-trait = "0.1"
blake2 = "0.9"
+bytes = "1.0"
+digest = "0.10"
err-derive = "0.3"
+git-version = "0.3.4"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
+lazy_static = "1.4"
tracing = "0.1.30"
rand = "0.8"
-sha2 = "0.9"
-
-sled = "0.34"
+sha2 = "0.10"
chrono = "0.4"
rmp-serde = "0.15"
@@ -33,11 +39,13 @@ toml = "0.5"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+netapp = "0.5"
http = "0.2"
hyper = "0.14"
opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
+
+
+[features]
+k2v = []
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
new file mode 100644
index 00000000..5631ea6b
--- /dev/null
+++ b/src/util/async_hash.rs
@@ -0,0 +1,61 @@
+use bytes::Bytes;
+use digest::Digest;
+
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+use crate::data::*;
+
+/// Compute the sha256 of a slice,
+/// spawning on a tokio thread for CPU-intensive processing
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_sha256sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || sha256sum(&data))
+ .await
+ .unwrap()
+}
+
+/// Compute the blake2sum of a slice,
+/// spawning on a tokio thread for CPU-intensive processing.
+/// The argument has to be an owned Bytes, as it is moved out to a new thread.
+pub async fn async_blake2sum(data: Bytes) -> Hash {
+ tokio::task::spawn_blocking(move || blake2sum(&data))
+ .await
+ .unwrap()
+}
+
+// ----
+
+pub struct AsyncHasher<D: Digest> {
+ sendblk: mpsc::Sender<Bytes>,
+ task: JoinHandle<digest::Output<D>>,
+}
+
+impl<D: Digest> AsyncHasher<D> {
+ pub fn new() -> Self {
+ let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
+ let task = tokio::task::spawn_blocking(move || {
+ let mut digest = D::new();
+ while let Some(blk) = recvblk.blocking_recv() {
+ digest.update(&blk[..]);
+ }
+ digest.finalize()
+ });
+ Self { sendblk, task }
+ }
+
+ pub async fn update(&self, b: Bytes) {
+ self.sendblk.send(b).await.unwrap();
+ }
+
+ pub async fn finalize(self) -> digest::Output<D> {
+ drop(self.sendblk);
+ self.task.await.unwrap()
+ }
+}
+
+impl<D: Digest> Default for AsyncHasher<D> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/util/background.rs b/src/util/background.rs
deleted file mode 100644
index bfdaaf1e..00000000
--- a/src/util/background.rs
+++ /dev/null
@@ -1,153 +0,0 @@
-//! Job runner for futures and async functions
-use core::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::*;
-use futures::select;
-use tokio::sync::{mpsc, watch, Mutex};
-
-use crate::error::Error;
-
-type JobOutput = Result<(), Error>;
-type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
-/// Job runner for futures and async functions
-pub struct BackgroundRunner {
- stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
-}
-
-impl BackgroundRunner {
- /// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
- let (worker_in, mut worker_out) = mpsc::unbounded_channel();
-
- let stop_signal_2 = stop_signal.clone();
- let await_all_done = tokio::spawn(async move {
- loop {
- let wkr = {
- select! {
- item = worker_out.recv().fuse() => {
- match item {
- Some(x) => x,
- None => break,
- }
- }
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal_2.borrow() {
- break;
- } else {
- continue;
- }
- }
- }
- };
- if let Err(e) = wkr.await {
- error!("Error while awaiting for worker: {}", e);
- }
- }
- });
-
- let (queue_in, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
- let stop_signal = stop_signal.clone();
-
- worker_in
- .send(tokio::spawn(async move {
- loop {
- let (job, cancellable) = {
- select! {
- item = wait_job(&queue_out).fuse() => match item {
- // We received a task, process it
- Some(x) => x,
- // We received a signal that no more tasks will ever be sent
- // because the sending side was dropped. Exit now.
- None => break,
- },
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal.borrow() {
- // Nothing has been going on for 5 secs, and we are shutting
- // down. Exit now.
- break;
- } else {
- // Nothing is going on but we don't want to exit.
- continue;
- }
- }
- }
- };
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- }
- info!("Background worker {} exiting", i);
- }))
- .unwrap();
- }
-
- let bgrunner = Arc::new(Self {
- stop_signal,
- queue_in,
- worker_in,
- });
- (bgrunner, await_all_done)
- }
-
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, false))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, true))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- pub fn spawn_worker<F, T>(&self, name: String, worker: F)
- where
- F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = ()> + Send + 'static,
- {
- let stop_signal = self.stop_signal.clone();
- let task = tokio::spawn(async move {
- info!("Worker started: {}", name);
- worker(stop_signal).await;
- info!("Worker exited: {}", name);
- });
- self.worker_in
- .send(task)
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-}
-
-async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
- q.lock().await.recv().await
-}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..2568ea11
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,48 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerState::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerState::Busy;
+ }
+ None => return WorkerState::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..619f5068
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,117 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerState};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub struct WorkerInfo {
+ pub name: String,
+ pub info: Option<String>,
+ pub state: WorkerState,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<(String, u64)>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let mut worker_processor =
+ WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
+
+ let await_all_done = tokio::spawn(async move {
+ worker_processor.run().await;
+ });
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ worker_info,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
+ self.worker_info.lock().unwrap().clone()
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..f5e3addb
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,260 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use futures::future::*;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::{mpsc, watch};
+
+use crate::background::WorkerInfo;
+use crate::error::Error;
+use crate::time::now_msec;
+
+#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+pub enum WorkerState {
+ Busy,
+ Throttled(f32),
+ Idle,
+ Done,
+}
+
+impl std::fmt::Display for WorkerState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ WorkerState::Busy => write!(f, "Busy"),
+ WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Idle => write!(f, "Idle"),
+ WorkerState::Done => write!(f, "Done"),
+ }
+ }
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+
+ fn info(&self) -> Option<String> {
+ None
+ }
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again after a short delay.
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ worker_info,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ stop_signal_worker,
+ worker: new_worker,
+ state: WorkerState::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
+ };
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
+
+ // Save worker info
+ let mut wi = self.worker_info.lock().unwrap();
+ match wi.get_mut(&worker.task_id) {
+ Some(i) => {
+ i.state = worker.state;
+ i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
+ }
+ None => {
+ wi.insert(worker.task_id, WorkerInfo {
+ name: worker.worker.name(),
+ state: worker.state,
+ info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
+ });
+ }
+ }
+
+ if worker.state == WorkerState::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.state == WorkerState::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
+ } else {
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited peacefully \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
+ }
+ }
+ }
+}
+
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ state: WorkerState,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<(String, u64)>,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.state {
+ WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.state = s;
+ self.consecutive_errors = 0;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some((format!("{}", e), now_msec()));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.state = WorkerState::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
+ }
+ },
+ WorkerState::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ if !*self.stop_signal.borrow() {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+ self.state = WorkerState::Busy;
+ }
+ WorkerState::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
+ }
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
+ }
+ }
+ }
+ WorkerState::Done => unreachable!(),
+ }
+ }
+}
diff --git a/src/util/config.rs b/src/util/config.rs
index e4d96476..2d4b4f57 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -3,12 +3,8 @@ use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
-use serde::de::Error as SerdeError;
use serde::{de, Deserialize};
-use netapp::util::parse_and_resolve_peer_addr;
-use netapp::NodeID;
-
use crate::error::Error;
/// Represent the whole configuration
@@ -23,10 +19,6 @@ pub struct Config {
#[serde(default = "default_block_size")]
pub block_size: usize,
- /// Size of data blocks to save to disk
- #[serde(default = "default_block_manager_background_tranquility")]
- pub block_manager_background_tranquility: u32,
-
/// Replication mode. Supported values:
/// - none, 1 -> no replication
/// - 2 -> 2-way replication
@@ -47,11 +39,16 @@ pub struct Config {
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Public IP address of this node
- pub rpc_public_addr: Option<SocketAddr>,
+ pub rpc_public_addr: Option<String>,
+
+ /// Timeout for Netapp's ping messagess
+ pub rpc_ping_timeout_msec: Option<u64>,
+ /// Timeout for Netapp RPC calls
+ pub rpc_timeout_msec: Option<u64>,
/// Bootstrap peers RPC address
- #[serde(deserialize_with = "deserialize_vec_addr", default)]
- pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ #[serde(default)]
+ pub bootstrap_peers: Vec<String>,
/// Consul host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
@@ -64,19 +61,27 @@ pub struct Config {
#[serde(default)]
pub kubernetes_skip_crd: bool,
+ // -- DB
+ /// Database engine to use for metadata (options: sled, sqlite, lmdb)
+ #[serde(default = "default_db_engine")]
+ pub db_engine: String,
+
/// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")]
pub sled_cache_capacity: u64,
-
/// Sled flush interval in milliseconds
#[serde(default = "default_sled_flush_every_ms")]
pub sled_flush_every_ms: u64,
+ // -- APIs
/// Configuration for S3 api
- pub s3_api: ApiConfig,
+ pub s3_api: S3ApiConfig,
+
+ /// Configuration for K2V api
+ pub k2v_api: Option<K2VApiConfig>,
/// Configuration for serving files as normal web server
- pub s3_web: WebConfig,
+ pub s3_web: Option<WebConfig>,
/// Configuration for the admin API endpoint
#[serde(default = "Default::default")]
@@ -85,9 +90,9 @@ pub struct Config {
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
-pub struct ApiConfig {
+pub struct S3ApiConfig {
/// Address and port to bind for api serving
- pub api_bind_addr: SocketAddr,
+ pub api_bind_addr: Option<SocketAddr>,
/// S3 region to use
pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None,
@@ -95,6 +100,13 @@ pub struct ApiConfig {
pub root_domain: Option<String>,
}
+/// Configuration for K2V api
+#[derive(Deserialize, Debug, Clone)]
+pub struct K2VApiConfig {
+ /// Address and port to bind for api serving
+ pub api_bind_addr: SocketAddr,
+}
+
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
@@ -109,10 +121,18 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
+ /// Bearer token to use to scrape metrics
+ pub metrics_token: Option<String>,
+ /// Bearer token to use to access Admin API endpoints
+ pub admin_token: Option<String>,
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
+fn default_db_engine() -> String {
+ "sled".into()
+}
+
fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024
}
@@ -122,9 +142,6 @@ fn default_sled_flush_every_ms() -> u64 {
fn default_block_size() -> usize {
1048576
}
-fn default_block_manager_background_tranquility() -> u32 {
- 2
-}
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
@@ -138,24 +155,6 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
Ok(toml::from_str(&config)?)
}
-fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAddr)>, D::Error>
-where
- D: de::Deserializer<'de>,
-{
- let mut ret = vec![];
-
- for peer in <Vec<&str>>::deserialize(deserializer)? {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(peer).ok_or_else(|| {
- D::Error::custom(format!("Unable to parse or resolve peer: {}", peer))
- })?;
- for ip in addrs {
- ret.push((pubkey, ip));
- }
- }
-
- Ok(ret)
-}
-
fn default_compression() -> Option<i32> {
Some(1)
}
diff --git a/src/util/crdt/bool.rs b/src/util/crdt/bool.rs
index 53af8f82..111eb5f1 100644
--- a/src/util/crdt/bool.rs
+++ b/src/util/crdt/bool.rs
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Boolean, where `true` is an absorbing state
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Bool(bool);
impl Bool {
diff --git a/src/util/crdt/deletable.rs b/src/util/crdt/deletable.rs
index c76f5cbb..e771aceb 100644
--- a/src/util/crdt/deletable.rs
+++ b/src/util/crdt/deletable.rs
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Deletable object (once deleted, cannot go back)
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Deletable<T> {
Present(T),
Deleted,
diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs
index 254abe8e..958844c9 100644
--- a/src/util/crdt/lww.rs
+++ b/src/util/crdt/lww.rs
@@ -37,7 +37,7 @@ use crate::crdt::crdt::*;
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in enterprise when reconciliating databases with ad-hoc scripts.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Lww<T> {
ts: u64,
v: T,
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index c155c3a8..88113856 100644
--- a/src/util/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -23,7 +23,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LwwMap<K, V> {
vals: Vec<(K, u64, V)>,
}
@@ -140,6 +140,11 @@ where
self.vals.clear();
}
+ /// Retain only values that match a certain predicate
+ pub fn retain(&mut self, pred: impl FnMut(&(K, u64, V)) -> bool) {
+ self.vals.retain(pred);
+ }
+
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
diff --git a/src/util/crdt/map.rs b/src/util/crdt/map.rs
index f9ed19b6..5d1e1520 100644
--- a/src/util/crdt/map.rs
+++ b/src/util/crdt/map.rs
@@ -16,7 +16,7 @@ use crate::crdt::crdt::*;
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
-#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Map<K, V> {
vals: Vec<(K, V)>,
}
diff --git a/src/util/error.rs b/src/util/error.rs
index bdb3a69b..9995c746 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -26,8 +26,8 @@ pub enum Error {
#[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error),
- #[error(display = "Sled error: {}", _0)]
- Sled(#[error(source)] sled::Error),
+ #[error(display = "DB error: {}", _0)]
+ Db(#[error(source)] garage_db::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error),
@@ -44,6 +44,9 @@ pub enum Error {
#[error(display = "Tokio semaphore acquire error: {}", _0)]
TokioSemAcquire(#[error(source)] tokio::sync::AcquireError),
+ #[error(display = "Tokio broadcast receive error: {}", _0)]
+ TokioBcastRecv(#[error(source)] tokio::sync::broadcast::error::RecvError),
+
#[error(display = "Remote error: {}", _0)]
RemoteError(String),
@@ -75,11 +78,11 @@ impl Error {
}
}
-impl From<sled::transaction::TransactionError<Error>> for Error {
- fn from(e: sled::transaction::TransactionError<Error>) -> Error {
+impl From<garage_db::TxError<Error>> for Error {
+ fn from(e: garage_db::TxError<Error>) -> Error {
match e {
- sled::transaction::TransactionError::Abort(x) => x,
- sled::transaction::TransactionError::Storage(x) => Error::Sled(x),
+ garage_db::TxError::Abort(x) => x,
+ garage_db::TxError::Db(x) => Error::Db(x),
}
}
}
diff --git a/src/util/formater.rs b/src/util/formater.rs
new file mode 100644
index 00000000..95324f9a
--- /dev/null
+++ b/src/util/formater.rs
@@ -0,0 +1,28 @@
+pub fn format_table(data: Vec<String>) {
+ let data = data
+ .iter()
+ .map(|s| s.split('\t').collect::<Vec<_>>())
+ .collect::<Vec<_>>();
+
+ let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max);
+ let mut column_size = vec![0; columns];
+
+ let mut out = String::new();
+
+ for row in data.iter() {
+ for (i, col) in row.iter().enumerate() {
+ column_size[i] = std::cmp::max(column_size[i], col.chars().count());
+ }
+ }
+
+ for row in data.iter() {
+ for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) {
+ out.push_str(col);
+ (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
+ }
+ out.push_str(row[row.len() - 1]);
+ out.push('\n');
+ }
+
+ print!("{}", out);
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index e83fc2e6..264cc192 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -3,14 +3,16 @@
#[macro_use]
extern crate tracing;
+pub mod async_hash;
pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
pub mod error;
+pub mod formater;
pub mod metrics;
pub mod persister;
-pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
+pub mod version;
diff --git a/src/util/metrics.rs b/src/util/metrics.rs
index 1b05eabe..b882a886 100644
--- a/src/util/metrics.rs
+++ b/src/util/metrics.rs
@@ -1,4 +1,4 @@
-use std::time::SystemTime;
+use std::time::Instant;
use futures::{future::BoxFuture, Future, FutureExt};
use rand::Rng;
@@ -28,10 +28,12 @@ where
attributes: &'a [KeyValue],
) -> BoxFuture<'a, Self::Output> {
async move {
- let request_start = SystemTime::now();
+ let request_start = Instant::now();
let res = self.await;
r.record(
- request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
+ Instant::now()
+ .saturating_duration_since(request_start)
+ .as_secs_f64(),
attributes,
);
res
@@ -41,9 +43,13 @@ where
fn bound_record_duration(self, r: &'a BoundValueRecorder<f64>) -> BoxFuture<'a, Self::Output> {
async move {
- let request_start = SystemTime::now();
+ let request_start = Instant::now();
let res = self.await;
- r.record(request_start.elapsed().map_or(0.0, |d| d.as_secs_f64()));
+ r.record(
+ Instant::now()
+ .saturating_duration_since(request_start)
+ .as_secs_f64(),
+ );
res
}
.boxed()
diff --git a/src/util/sled_counter.rs b/src/util/sled_counter.rs
deleted file mode 100644
index bc54cea0..00000000
--- a/src/util/sled_counter.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-use std::sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
-};
-
-use sled::{CompareAndSwapError, IVec, Iter, Result, Tree};
-
-#[derive(Clone)]
-pub struct SledCountedTree(Arc<SledCountedTreeInternal>);
-
-struct SledCountedTreeInternal {
- tree: Tree,
- len: AtomicUsize,
-}
-
-impl SledCountedTree {
- pub fn new(tree: Tree) -> Self {
- let len = tree.len();
- Self(Arc::new(SledCountedTreeInternal {
- tree,
- len: AtomicUsize::new(len),
- }))
- }
-
- pub fn len(&self) -> usize {
- self.0.len.load(Ordering::Relaxed)
- }
-
- pub fn is_empty(&self) -> bool {
- self.0.tree.is_empty()
- }
-
- pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
- self.0.tree.get(key)
- }
-
- pub fn iter(&self) -> Iter {
- self.0.tree.iter()
- }
-
- // ---- writing functions ----
-
- pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
- where
- K: AsRef<[u8]>,
- V: Into<IVec>,
- {
- let res = self.0.tree.insert(key, value);
- if res == Ok(None) {
- self.0.len.fetch_add(1, Ordering::Relaxed);
- }
- res
- }
-
- pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
- let res = self.0.tree.remove(key);
- if matches!(res, Ok(Some(_))) {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- }
- res
- }
-
- pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
- let res = self.0.tree.pop_min();
- if let Ok(Some(_)) = &res {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- };
- res
- }
-
- pub fn compare_and_swap<K, OV, NV>(
- &self,
- key: K,
- old: Option<OV>,
- new: Option<NV>,
- ) -> Result<std::result::Result<(), CompareAndSwapError>>
- where
- K: AsRef<[u8]>,
- OV: AsRef<[u8]>,
- NV: Into<IVec>,
- {
- let old_some = old.is_some();
- let new_some = new.is_some();
-
- let res = self.0.tree.compare_and_swap(key, old, new);
-
- if res == Ok(Ok(())) {
- match (old_some, new_some) {
- (false, true) => {
- self.0.len.fetch_add(1, Ordering::Relaxed);
- }
- (true, false) => {
- self.0.len.fetch_sub(1, Ordering::Relaxed);
- }
- _ => (),
- }
- }
- res
- }
-}
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 28711387..8a96cbb3 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
+use crate::background::WorkerState;
+
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@@ -33,8 +35,8 @@ impl Tranquilizer {
}
}
- pub async fn tranquilize(&mut self, tranquility: u32) {
- let observation = Instant::now() - self.last_step_begin;
+ fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
+ let observation = Instant::now().saturating_duration_since(self.last_step_begin);
self.observations.push_back(observation);
self.sum_observations += observation;
@@ -45,13 +47,32 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ Some(delay)
+ } else {
+ None
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
+ self.reset();
}
+ }
- self.reset();
+ #[must_use]
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
+ match self.tranquilize_internal(tranquility) {
+ Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
+ None => WorkerState::Busy,
+ }
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
+
+ pub fn clear(&mut self) {
+ self.observations.clear();
+ }
}
diff --git a/src/util/version.rs b/src/util/version.rs
new file mode 100644
index 00000000..b515dccc
--- /dev/null
+++ b/src/util/version.rs
@@ -0,0 +1,28 @@
+use std::sync::Arc;
+
+use arc_swap::{ArcSwap, ArcSwapOption};
+
+lazy_static::lazy_static! {
+ static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ )));
+ static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None);
+}
+
+pub fn garage_version() -> &'static str {
+ &VERSION.load()
+}
+
+pub fn garage_features() -> Option<&'static [&'static str]> {
+ FEATURES.load().as_ref().map(|f| &f[..])
+}
+
+pub fn init_version(version: &'static str) {
+ VERSION.store(Arc::new(version));
+}
+
+pub fn init_features(features: &'static [&'static str]) {
+ FEATURES.store(Some(Arc::new(features)));
+}