aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs329
-rw-r--r--src/block/repair.rs444
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/db/sqlite_adapter.rs2
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin.rs29
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/structs.rs55
-rw-r--r--src/garage/cli/util.rs58
-rw-r--r--src/garage/repair/online.rs222
-rw-r--r--src/model/index_counter.rs169
-rw-r--r--src/rpc/system.rs6
-rw-r--r--src/table/gc.rs89
-rw-r--r--src/table/merkle.rs87
-rw-r--r--src/table/sync.rs198
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background.rs160
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs117
-rw-r--r--src/util/background/worker.rs261
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/tranquilizer.rs25
24 files changed, 1606 insertions, 709 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 80346aca..2555a44a 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
+arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
diff --git a/src/block/lib.rs b/src/block/lib.rs
index dc685657..ebdb95d8 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
+pub mod repair;
mod block;
mod metrics;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 32ba0431..017ba9da 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,18 +1,17 @@
-use core::ops::Bound;
-
use std::convert::TryInto;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
-use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::select;
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -22,6 +21,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -36,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -93,16 +94,18 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: BlockRc,
+ pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: CountedTree,
- system: Arc<System>,
+ pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -110,6 +113,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
+enum ResyncIterResult {
+ BusyDidSomething,
+ BusyDidNothing,
+ IdleFor(Duration),
+}
+
impl BlockManager {
pub fn new(
db: &db::Db,
@@ -157,10 +166,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -218,90 +228,6 @@ impl BlockManager {
Ok(())
}
- /// Launch the repair procedure on the data store
- ///
- /// This will list all blocks locally present, as well as those
- /// that are required because of refcount > 0, and will try
- /// to fix any mismatch between the two.
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table.
- let mut next_start: Option<Hash> = None;
- loop {
- // We have to do this complicated two-step process where we first read a bunch
- // of hashes from the RC table, and then insert them in the to-resync queue,
- // because of SQLite. Basically, as long as we have an iterator on a DB table,
- // we can't do anything else on the DB. The naive approach (which we had previously)
- // of just iterating on the RC table and inserting items one to one in the resync
- // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
- // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
- // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
- // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
- let mut batch_of_hashes = vec![];
- let start_bound = match next_start.as_ref() {
- None => Bound::Unbounded,
- Some(x) => Bound::Excluded(x.as_slice()),
- };
- for entry in self
- .rc
- .rc
- .range::<&[u8], _>((start_bound, Bound::Unbounded))?
- {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- batch_of_hashes.push(hash);
- if batch_of_hashes.len() >= 1000 {
- break;
- }
- }
- if batch_of_hashes.is_empty() {
- break;
- }
-
- for hash in batch_of_hashes.into_iter() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- next_start = Some(hash)
- }
-
- if *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
- self.for_each_file(
- (),
- move |_, hash| async move {
- self.put_to_resync(&hash, Duration::from_secs(0))
- .map_err(Into::into)
- },
- must_exit,
- )
- .await
- }
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn scrub_data_store(
- &self,
- must_exit: &watch::Receiver<bool>,
- tranquility: u32,
- ) -> Result<(), Error> {
- let tranquilizer = Tranquilizer::new(30);
- self.for_each_file(
- tranquilizer,
- move |mut tranquilizer, hash| async move {
- let _ = self.read_block(&hash).await;
- tranquilizer.tranquilize(tranquility).await;
- Ok(tranquilizer)
- },
- must_exit,
- )
- .await
- }
-
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@@ -321,6 +247,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -390,7 +327,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -554,18 +491,23 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
+ let worker = ResyncWorker::new(self.clone());
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
- background.spawn_worker("block resync worker".into(), move |must_exit| {
- self.resync_loop(must_exit)
- });
+ background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
+ self.system.background.spawn_worker(scrub_worker);
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
+ pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@@ -579,37 +521,7 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- let mut tranquilizer = Tranquilizer::new(30);
-
- while !*must_exit.borrow() {
- match self.resync_iter(&mut must_exit).await {
- Ok(true) => {
- tranquilizer.tranquilize(self.background_tranquility).await;
- }
- Ok(false) => {
- tranquilizer.reset();
- }
- Err(e) => {
- // The errors that we have here are only Sled errors
- // We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
- // if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
- tranquilizer.reset();
- }
- }
- }
- }
-
- // The result of resync_iter is:
- // - Ok(true) -> a block was processed (successfully or not)
- // - Ok(false) -> no block was processed, but we are ready for the next iteration
- // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -629,7 +541,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
- return Ok(false);
+ return Ok(ResyncIterResult::BusyDidNothing);
}
}
@@ -676,15 +588,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
- Ok(true)
+ Ok(ResyncIterResult::BusyDidSomething)
} else {
- let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_millis(
+ time_msec - now,
+ )))
}
} else {
// Here we wait either for a notification that an item has been
@@ -693,13 +601,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
- let delay = tokio::time::sleep(Duration::from_secs(10));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
}
}
@@ -814,72 +716,6 @@ impl BlockManager {
Ok(())
}
-
- // ---- Utility: iteration on files in the data directory ----
-
- async fn for_each_file<F, Fut, State>(
- &self,
- state: State,
- mut f: F,
- must_exit: &watch::Receiver<bool>,
- ) -> Result<(), Error>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send,
- {
- self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
- .await
- .map(|_| ())
- }
-
- fn for_each_file_rec<'a, F, Fut, State>(
- &'a self,
- path: &'a Path,
- mut state: State,
- f: &'a mut F,
- must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<State, Error>>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send + 'a,
- {
- async move {
- let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
- if *must_exit.borrow() {
- break;
- }
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- state = self
- .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
- .await?;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- state = f(state, hash.into()).await?;
- }
- }
- Ok(state)
- }
- .boxed()
- }
}
#[async_trait]
@@ -898,6 +734,77 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct ResyncWorker {
+ manager: Arc<BlockManager>,
+ tranquilizer: Tranquilizer,
+ next_delay: Duration,
+}
+
+impl ResyncWorker {
+ fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ tranquilizer: Tranquilizer::new(30),
+ next_delay: Duration::from_secs(10),
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for ResyncWorker {
+ fn name(&self) -> String {
+ "Block resync worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let mut ret = vec![];
+ let qlen = self.manager.resync_queue_len().unwrap_or(0);
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
+ if qlen > 0 {
+ ret.push(format!("{} blocks in queue", qlen));
+ }
+ if elen > 0 {
+ ret.push(format!("{} blocks in error state", elen));
+ }
+ if !ret.is_empty() {
+ Some(ret.join(", "))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ self.tranquilizer.reset();
+ match self.manager.resync_iter().await {
+ Ok(ResyncIterResult::BusyDidSomething) => Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.manager.background_tranquility)),
+ Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
+ Ok(ResyncIterResult::IdleFor(delay)) => {
+ self.next_delay = delay;
+ Ok(WorkerState::Idle)
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ // Here we just give the error to the worker manager,
+ // it will print it to the logs and increment a counter
+ Err(e.into())
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(self.next_delay) => (),
+ _ = self.manager.resync_notify.notified() => (),
+ };
+ WorkerState::Busy
+ }
+}
+
struct BlockStatus {
exists: bool,
needed: RcEntry,
diff --git a/src/block/repair.rs b/src/block/repair.rs
new file mode 100644
index 00000000..07ff6772
--- /dev/null
+++ b/src/block/repair.rs
@@ -0,0 +1,444 @@
+use core::ops::Bound;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use tokio::fs;
+use tokio::select;
+use tokio::sync::mpsc;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::persister::Persister;
+use garage_util::time::*;
+use garage_util::tranquilizer::Tranquilizer;
+
+use crate::manager::*;
+
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
+
+pub struct RepairWorker {
+ manager: Arc<BlockManager>,
+ next_start: Option<Hash>,
+ block_iter: Option<BlockStoreIterator>,
+}
+
+impl RepairWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ next_start: None,
+ block_iter: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairWorker {
+ fn name(&self) -> String {
+ "Block repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ match self.block_iter.as_ref() {
+ None => {
+ let idx_bytes = self
+ .next_start
+ .as_ref()
+ .map(|x| x.as_slice())
+ .unwrap_or(&[]);
+ let idx_bytes = if idx_bytes.len() > 4 {
+ &idx_bytes[..4]
+ } else {
+ idx_bytes
+ };
+ Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ }
+ Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.block_iter.as_mut() {
+ None => {
+ // Phase 1: Repair blocks from RC table.
+
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ // TODO: maybe do this with tokio::task::spawn_blocking ?
+ let mut batch_of_hashes = vec![];
+ let start_bound = match self.next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .manager
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ // move on to phase 2
+ self.block_iter = Some(BlockStoreIterator::new(&self.manager));
+ return Ok(WorkerState::Busy);
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ self.next_start = Some(hash)
+ }
+
+ Ok(WorkerState::Busy)
+ }
+ Some(bi) => {
+ // Phase 2: Repair blocks actually on disk
+ // Lists all blocks on disk and adds them to the resync queue.
+ // This allows us to find blocks we are storing but don't actually need,
+ // so that we can offload them if necessary and then delete them locally.
+ if let Some(hash) = bi.next().await? {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Done)
+ }
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ unreachable!()
+ }
+}
+
+// ----
+
+pub struct ScrubWorker {
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+
+ work: ScrubWorkerState,
+ tranquilizer: Tranquilizer,
+
+ persister: Persister<ScrubWorkerPersisted>,
+ persisted: ScrubWorkerPersisted,
+}
+
+#[derive(Serialize, Deserialize)]
+struct ScrubWorkerPersisted {
+ tranquility: u32,
+ time_last_complete_scrub: u64,
+ corruptions_detected: u64,
+}
+
+enum ScrubWorkerState {
+ Running(BlockStoreIterator),
+ Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Finished,
+}
+
+impl Default for ScrubWorkerState {
+ fn default() -> Self {
+ ScrubWorkerState::Finished
+ }
+}
+
+#[derive(Debug)]
+pub enum ScrubWorkerCommand {
+ Start,
+ Pause(Duration),
+ Resume,
+ Cancel,
+ SetTranquility(u32),
+}
+
+impl ScrubWorker {
+ pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
+ let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
+ let persisted = match persister.load() {
+ Ok(v) => v,
+ Err(_) => ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ tranquility: 4,
+ corruptions_detected: 0,
+ },
+ };
+ Self {
+ manager,
+ rx_cmd,
+ work: ScrubWorkerState::Finished,
+ tranquilizer: Tranquilizer::new(30),
+ persister,
+ persisted,
+ }
+ }
+
+ async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
+ match cmd {
+ ScrubWorkerCommand::Start => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Finished => {
+ let iterator = BlockStoreIterator::new(&self.manager);
+ ScrubWorkerState::Running(iterator)
+ }
+ work => {
+ error!("Cannot start scrub worker: already running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Pause(dur) => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
+ ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ }
+ work => {
+ error!("Cannot pause scrub worker: not running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Resume => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ work => {
+ error!("Cannot resume scrub worker: not paused!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Cancel => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Finished
+ }
+ work => {
+ error!("Cannot cancel scrub worker: not running!");
+ work
+ }
+ }
+ }
+ ScrubWorkerCommand::SetTranquility(t) => {
+ self.persisted.tranquility = t;
+ if let Err(e) = self.persister.save_async(&self.persisted).await {
+ error!("Could not save new tranquilitiy value: {}", e);
+ }
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for ScrubWorker {
+ fn name(&self) -> String {
+ "Block scrub worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let s = match &self.work {
+ ScrubWorkerState::Running(bsi) => format!(
+ "{:.2}% done (tranquility = {})",
+ bsi.progress() * 100.,
+ self.persisted.tranquility
+ ),
+ ScrubWorkerState::Paused(bsi, rt) => {
+ format!(
+ "Paused, {:.2}% done, resumes at {}",
+ bsi.progress() * 100.,
+ msec_to_rfc3339(*rt)
+ )
+ }
+ ScrubWorkerState::Finished => format!(
+ "Last completed scrub: {}",
+ msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ ),
+ };
+ Some(format!(
+ "{} ; corruptions detected: {}",
+ s, self.persisted.corruptions_detected
+ ))
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.rx_cmd.try_recv() {
+ Ok(cmd) => self.handle_cmd(cmd).await,
+ Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
+ Err(mpsc::error::TryRecvError::Empty) => (),
+ };
+
+ match &mut self.work {
+ ScrubWorkerState::Running(bsi) => {
+ self.tranquilizer.reset();
+ if let Some(hash) = bsi.next().await? {
+ match self.manager.read_block(&hash).await {
+ Err(Error::CorruptData(_)) => {
+ error!("Found corrupt data block during scrub: {:?}", hash);
+ self.persisted.corruptions_detected += 1;
+ self.persister.save_async(&self.persisted).await?;
+ }
+ Err(e) => return Err(e),
+ _ => (),
+ };
+ Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.persisted.tranquility))
+ } else {
+ self.persisted.time_last_complete_scrub = now_msec();
+ self.persister.save_async(&self.persisted).await?;
+ self.work = ScrubWorkerState::Finished;
+ self.tranquilizer.clear();
+ Ok(WorkerState::Idle)
+ }
+ }
+ _ => Ok(WorkerState::Idle),
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ let (wait_until, command) = match &self.work {
+ ScrubWorkerState::Running(_) => return WorkerState::Busy,
+ ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
+ ScrubWorkerState::Finished => (
+ self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
+ ScrubWorkerCommand::Start,
+ ),
+ };
+
+ let now = now_msec();
+ if now >= wait_until {
+ self.handle_cmd(command).await;
+ return WorkerState::Busy;
+ }
+ let delay = Duration::from_millis(wait_until - now);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(command).await,
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd).await;
+ } else {
+ return WorkerState::Done;
+ }
+ }
+
+ match &self.work {
+ ScrubWorkerState::Running(_) => WorkerState::Busy,
+ _ => WorkerState::Idle,
+ }
+ }
+}
+
+// ----
+
+struct BlockStoreIterator {
+ path: Vec<ReadingDir>,
+}
+
+enum ReadingDir {
+ Pending(PathBuf),
+ Read {
+ subpaths: Vec<fs::DirEntry>,
+ pos: usize,
+ },
+}
+
+impl BlockStoreIterator {
+ fn new(manager: &BlockManager) -> Self {
+ let root_dir = manager.data_dir.clone();
+ Self {
+ path: vec![ReadingDir::Pending(root_dir)],
+ }
+ }
+
+ /// Returns progress done, between 0 and 1
+ fn progress(&self) -> f32 {
+ if self.path.is_empty() {
+ 1.0
+ } else {
+ let mut ret = 0.0;
+ let mut next_div = 1;
+ for p in self.path.iter() {
+ match p {
+ ReadingDir::Pending(_) => break,
+ ReadingDir::Read { subpaths, pos } => {
+ next_div *= subpaths.len();
+ ret += ((*pos - 1) as f32) / (next_div as f32);
+ }
+ }
+ }
+ ret
+ }
+ }
+
+ async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ loop {
+ let last_path = match self.path.last_mut() {
+ None => return Ok(None),
+ Some(lp) => lp,
+ };
+
+ if let ReadingDir::Pending(path) = last_path {
+ let mut reader = fs::read_dir(&path).await?;
+ let mut subpaths = vec![];
+ while let Some(ent) = reader.next_entry().await? {
+ subpaths.push(ent);
+ }
+ *last_path = ReadingDir::Read { subpaths, pos: 0 };
+ }
+
+ let (subpaths, pos) = match *last_path {
+ ReadingDir::Read {
+ ref subpaths,
+ ref mut pos,
+ } => (subpaths, pos),
+ ReadingDir::Pending(_) => unreachable!(),
+ };
+
+ let data_dir_ent = match subpaths.get(*pos) {
+ None => {
+ self.path.pop();
+ continue;
+ }
+ Some(ent) => {
+ *pos += 1;
+ ent
+ }
+ };
+
+ let name = data_dir_ent.file_name();
+ let name = if let Ok(n) = name.into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ent_type = data_dir_ent.file_type().await?;
+
+ let name = name.strip_suffix(".zst").unwrap_or(&name);
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ let path = data_dir_ent.path();
+ self.path.push(ReadingDir::Pending(path));
+ } else if name.len() == 64 {
+ if let Ok(h) = hex::decode(&name) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ return Ok(Some(hash.into()));
+ }
+ }
+ }
+ }
+}
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 6d8f64be..f697054b 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -19,7 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
-log = "0.4"
+tracing = "0.1.30"
heed = "0.11"
rusqlite = { version = "0.27", features = ["bundled"] }
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 68d96ca0..97a78b07 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -6,7 +6,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
-use log::trace;
+use tracing::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction};
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 640e6975..8948e750 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
+garage_block = { version = "0.7.0", path = "../block" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
@@ -31,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
bytesize = "1.1"
+timeago = "0.3"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 48914655..71ee608c 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -24,7 +24,7 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::online::OnlineRepair;
+use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
@@ -36,6 +36,7 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
+ Worker(WorkerOpt),
// Replies
Ok(String),
@@ -47,6 +48,10 @@ pub enum AdminRpc {
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
}
impl Rpc for AdminRpc {
@@ -693,15 +698,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = OnlineRepair {
- garage: self.garage.clone(),
- };
- self.garage
- .system
- .background
- .spawn_worker("Repair worker".into(), move |must_exit| async move {
- repair.repair_worker(opt, must_exit).await
- });
+ launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -830,6 +827,17 @@ impl AdminRpcHandler {
Ok(())
}
+
+ // ----
+
+ async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
+ match opt.cmd {
+ WorkerCmd::List { opt } => {
+ let workers = self.garage.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, opt))
+ }
+ }
+ }
}
#[async_trait]
@@ -845,6 +853,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 3a0bd956..1aa2c2ff 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,4 +1,5 @@
use std::collections::HashSet;
+use std::time::Duration;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -39,6 +40,7 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(),
}
}
@@ -100,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
+ let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@@ -110,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
- .map(|s| format!("{}s ago", s))
+ .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@@ -182,6 +185,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
+ AdminRpc::WorkerList(wi, wlo) => {
+ print_worker_info(wi, wlo);
+ }
r => {
error!("Unexpected response: {:?}", r);
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 4f2efe19..bc44b5ef 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -45,6 +45,10 @@ pub enum Command {
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
+
+ /// Manage background workers
+ #[structopt(name = "worker")]
+ Worker(WorkerOpt),
}
#[derive(StructOpt, Debug)]
@@ -423,8 +427,29 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
- /// Tranquility factor (see tranquilizer documentation)
- #[structopt(name = "tranquility", default_value = "2")]
+ #[structopt(subcommand)]
+ cmd: ScrubCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum ScrubCmd {
+ /// Start scrub
+ #[structopt(name = "start")]
+ Start,
+ /// Pause scrub (it will resume automatically after 24 hours)
+ #[structopt(name = "pause")]
+ Pause,
+ /// Resume paused scrub
+ #[structopt(name = "resume")]
+ Resume,
+ /// Cancel scrub in progress
+ #[structopt(name = "cancel")]
+ Cancel,
+ /// Set tranquility level for in-progress and future scrubs
+ #[structopt(name = "set-tranquility")]
+ SetTranquility {
+ #[structopt()]
tranquility: u32,
},
}
@@ -460,3 +485,29 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct WorkerOpt {
+ #[structopt(subcommand)]
+ pub cmd: WorkerCmd,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerCmd {
+ /// List all workers on Garage node
+ #[structopt(name = "list")]
+ List {
+ #[structopt(flatten)]
+ opt: WorkerListOpt,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub struct WorkerListOpt {
+ /// Show only busy workers
+ #[structopt(short = "b", long = "busy")]
+ pub busy: bool,
+ /// Show only workers with errors
+ #[structopt(short = "e", long = "errors")]
+ pub errors: bool,
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 329e8a3e..396938ae 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,14 +1,19 @@
use std::collections::HashMap;
+use std::time::Duration;
+use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
use garage_util::formater::format_table;
+use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use crate::cli::structs::WorkerListOpt;
+
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -235,3 +240,56 @@ pub fn find_matching_node(
Ok(candidates[0])
}
}
+
+pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+ let mut wi = wi.into_iter().collect::<Vec<_>>();
+ wi.sort_by_key(|(tid, info)| {
+ (
+ match info.state {
+ WorkerState::Busy | WorkerState::Throttled(_) => 0,
+ WorkerState::Idle => 1,
+ WorkerState::Done => 2,
+ },
+ *tid,
+ )
+ });
+
+ let mut table = vec![];
+ for (tid, info) in wi.iter() {
+ if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
+ continue;
+ }
+ if wlo.errors && info.errors == 0 {
+ continue;
+ }
+
+ table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
+ if let Some(i) = &info.info {
+ table.push(format!("\t\t {}", i));
+ }
+ let tf = timeago::Formatter::new();
+ let (err_ago, err_msg) = info
+ .last_error
+ .as_ref()
+ .map(|(m, t)| {
+ (
+ tf.convert(Duration::from_millis(now_msec() - t)),
+ m.as_str(),
+ )
+ })
+ .unwrap_or(("(?) ago".into(), "(?)"));
+ if info.consecutive_errors > 0 {
+ table.push(format!(
+ "\t\t {} consecutive errors ({} total), last {}",
+ info.consecutive_errors, info.errors, err_ago,
+ ));
+ table.push(format!("\t\t {}", err_msg));
+ } else if info.errors > 0 {
+ table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
+ if wlo.errors {
+ table.push(format!("\t\t {}", err_msg));
+ }
+ }
+ }
+ format_table(table);
+}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index d6a71742..e33cf097 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -1,89 +1,110 @@
use std::sync::Arc;
+use std::time::Duration;
+use async_trait::async_trait;
use tokio::sync::watch;
+use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_table::*;
+use garage_util::background::*;
use garage_util::error::Error;
use crate::*;
-pub struct OnlineRepair {
- pub garage: Arc<Garage>,
-}
-
-impl OnlineRepair {
- pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
- if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
- warn!("Repair worker failed with error: {}", e);
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+ match opt.what {
+ RepairWhat::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync();
+ garage.object_table.syncer.add_full_sync();
+ garage.version_table.syncer.add_full_sync();
+ garage.block_ref_table.syncer.add_full_sync();
+ garage.key_table.syncer.add_full_sync();
+ }
+ RepairWhat::Versions => {
+ info!("Repairing the versions table");
+ garage
+ .background
+ .spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ }
+ RepairWhat::BlockRefs => {
+ info!("Repairing the block refs table");
+ garage
+ .background
+ .spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ }
+ RepairWhat::Blocks => {
+ info!("Repairing the stored blocks");
+ garage
+ .background
+ .spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairWhat::Scrub { cmd } => {
+ let cmd = match cmd {
+ ScrubCmd::Start => ScrubWorkerCommand::Start,
+ ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
+ ScrubCmd::Resume => ScrubWorkerCommand::Resume,
+ ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
+ ScrubCmd::SetTranquility { tranquility } => {
+ ScrubWorkerCommand::SetTranquility(tranquility)
+ }
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await;
}
}
+}
- async fn repair_worker_aux(
- &self,
- opt: RepairOpt,
- must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- self.garage.bucket_table.syncer.add_full_sync();
- self.garage.object_table.syncer.add_full_sync();
- self.garage.version_table.syncer.add_full_sync();
- self.garage.block_ref_table.syncer.add_full_sync();
- self.garage.key_table.syncer.add_full_sync();
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- self.repair_versions(&must_exit).await?;
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- self.repair_block_ref(&must_exit).await?;
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- self.garage
- .block_manager
- .repair_data_store(&must_exit)
- .await?;
- }
- RepairWhat::Scrub { tranquility } => {
- info!("Verifying integrity of stored blocks");
- self.garage
- .block_manager
- .scrub_data_store(&must_exit, tranquility)
- .await?;
- }
+// ----
+
+struct RepairVersionsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
+
+impl RepairVersionsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
}
- Ok(())
}
+}
- async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
- let mut i = 0;
+#[async_trait]
+impl Worker for RepairVersionsWorker {
+ fn name(&self) -> String {
+ "Version repair worker".into()
+ }
- while !*must_exit.borrow() {
- let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? {
- Some((k, v)) => {
- pos = k;
- v
- }
- None => break,
- };
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
- i += 1;
- if i % 1000 == 0 {
- info!("repair_versions: {}", i);
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
}
-
- let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
- if version.deleted.get() {
- continue;
+ None => {
+ info!("repair_versions: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
}
+ };
+
+ self.counter += 1;
+
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ if !version.deleted.get() {
let object = self
.garage
.object_table
@@ -109,32 +130,59 @@ impl OnlineRepair {
.await?;
}
}
- info!("repair_versions: finished, done {}", i);
- Ok(())
+
+ Ok(WorkerState::Busy)
}
- async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
- let mut i = 0;
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ unreachable!()
+ }
+}
- while !*must_exit.borrow() {
- let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? {
- Some((k, v)) => {
- pos = k;
- v
- }
- None => break,
- };
+// ----
- i += 1;
- if i % 1000 == 0 {
- info!("repair_block_ref: {}", i);
- }
+struct RepairBlockrefsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
- if block_ref.deleted.get() {
- continue;
+impl RepairBlockrefsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairBlockrefsWorker {
+ fn name(&self) -> String {
+ "Block refs repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
}
+ None => {
+ info!("repair_block_ref: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ self.counter += 1;
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ if !block_ref.deleted.get() {
let version = self
.garage
.version_table
@@ -157,7 +205,11 @@ impl OnlineRepair {
.await?;
}
}
- info!("repair_block_ref: finished, done {}", i);
- Ok(())
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ unreachable!()
}
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 36e8172b..26833390 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -2,8 +2,8 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
@@ -11,6 +11,7 @@ use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -171,11 +172,13 @@ impl<T: CountedItem> IndexCounter<T> {
),
});
- let this2 = this.clone();
- background.spawn_worker(
- format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
- move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
- );
+ background.spawn_worker(IndexPropagatorWorker {
+ index_counter: this.clone(),
+ propagate_rx,
+ buf: HashMap::new(),
+ errors: 0,
+ });
+
this
}
@@ -239,68 +242,6 @@ impl<T: CountedItem> IndexCounter<T> {
Ok(())
}
- async fn propagate_loop(
- self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
- must_exit: watch::Receiver<bool>,
- ) {
- // This loop batches updates to counters to be sent all at once.
- // They are sent once the propagate_rx channel has been emptied (or is closed).
- let mut buf = HashMap::new();
- let mut errors = 0;
-
- loop {
- let (ent, closed) = match propagate_rx.try_recv() {
- Ok(ent) => (Some(ent), false),
- Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
- match propagate_rx.recv().await {
- Some(ent) => (Some(ent), false),
- None => (None, true),
- }
- }
- Err(mpsc::error::TryRecvError::Empty) => (None, false),
- Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
- };
-
- if let Some((pk, sk, counters)) = ent {
- let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry(self.this_node);
- match buf.entry(tree_key) {
- hash_map::Entry::Vacant(e) => {
- e.insert(dist_entry);
- }
- hash_map::Entry::Occupied(mut e) => {
- e.get_mut().merge(&dist_entry);
- }
- }
- // As long as we can add entries, loop back and add them to batch
- // before sending batch to other nodes
- continue;
- }
-
- if !buf.is_empty() {
- let entries = buf.iter().map(|(_k, v)| v);
- if let Err(e) = self.table.insert_many(entries).await {
- errors += 1;
- if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
- break;
- }
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
- tokio::time::sleep(Duration::from_secs(5)).await;
- continue;
- }
-
- buf.clear();
- errors = 0;
- }
-
- if closed || *must_exit.borrow() {
- break;
- }
- }
- }
-
pub fn offline_recount_all<TS, TR>(
&self,
counted_table: &Arc<Table<TS, TR>>,
@@ -437,6 +378,98 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
+struct IndexPropagatorWorker<T: CountedItem> {
+ index_counter: Arc<IndexCounter<T>>,
+ propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
+
+ buf: HashMap<Vec<u8>, CounterEntry<T>>,
+ errors: usize,
+}
+
+impl<T: CountedItem> IndexPropagatorWorker<T> {
+ fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
+ let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
+ match self.buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
+ fn name(&self) -> String {
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ if !self.buf.is_empty() {
+ Some(format!("{} items in queue", self.buf.len()))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let closed = loop {
+ match self.propagate_rx.try_recv() {
+ Ok((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ }
+ Err(mpsc::error::TryRecvError::Empty) => break false,
+ Err(mpsc::error::TryRecvError::Disconnected) => break true,
+ }
+ };
+
+ if !self.buf.is_empty() {
+ let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
+ let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
+ if let Err(e) = self.index_counter.table.insert_many(entries).await {
+ self.errors += 1;
+ if self.errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
+ return Ok(WorkerState::Done);
+ }
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
+ } else {
+ for k in entries_k {
+ self.buf.remove(&k);
+ }
+ self.errors = 0;
+ }
+
+ return Ok(WorkerState::Busy);
+ } else if closed {
+ return Ok(WorkerState::Done);
+ } else {
+ return Ok(WorkerState::Idle);
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ match self.propagate_rx.recv().await {
+ Some((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ WorkerState::Busy
+ }
+ None => match self.buf.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Done,
+ },
+ }
+ }
+}
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1d7c3ea4..f9f2970b 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -2,7 +2,7 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -104,6 +104,9 @@ pub struct System {
/// The job runner of this node
pub background: Arc<BackgroundRunner>,
+
+ /// Path to metadata directory
+ pub metadata_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -295,6 +298,7 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
background,
+ metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
sys
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e7fbbcb0..12218d97 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -328,6 +303,66 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} GC", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.gc.data.gc_todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerState::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerState::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerState::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 7685b193..a5c29723 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -78,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.updater_loop_iter() {
- Ok(true) => (),
- Ok(false) => {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- }
- }
-
- fn updater_loop_iter(&self) -> Result<bool, Error> {
+ fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
- Ok(true)
+ Ok(WorkerState::Busy)
} else {
- Ok(false)
+ Ok(WorkerState::Idle)
}
}
@@ -325,6 +298,54 @@ where
}
}
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} Merkle tree updater", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.0.todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let updater = self.0.clone();
+ tokio::task::spawn_blocking(move || {
+ for _i in 0..100 {
+ let s = updater.updater_loop_iter();
+ if !matches!(s, Ok(WorkerState::Busy)) {
+ return s;
+ }
+ }
+ Ok(WorkerState::Busy)
+ })
+ .await
+ .unwrap()
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerState::Busy
+ }
+}
+
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4c83e991..b3756a5e 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,17 +1,17 @@
use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use futures::select;
-use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- todo: Mutex<SyncTodo>,
+ add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -52,10 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@@ -80,118 +76,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let todo = SyncTodo { todo: vec![] };
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
- todo: Mutex::new(todo),
+ add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- system.background.spawn_worker(
- format!("table sync watcher for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- system.background.spawn_worker(
- format!("table syncer for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(20)).await;
- s3.add_full_sync();
+ system.background.spawn_worker(SyncWorker {
+ syncer: syncer.clone(),
+ ring_recv: system.ring.clone(),
+ ring: system.ring.borrow().clone(),
+ add_full_sync_rx,
+ todo: vec![],
+ next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) {
- let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- select! {
- _ = ring_recv.changed().fuse() => {
- let new_ring = ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- prev_ring = new_ring.clone();
- }
- }
- busy_opt = busy_rx.recv().fuse() => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- _ = must_exit.changed().fuse() => {},
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
- }
- }
- }
- }
-
pub fn add_full_sync(&self) {
- self.todo
- .lock()
- .unwrap()
- .add_full_sync(&self.data, &self.system);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true).unwrap();
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- F::TABLE_NAME,
- partition,
- e
- );
- }
- } else {
- busy_tx.send(false).unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
+ if self.add_full_sync_tx.send(()).is_err() {
+ error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
+ // ----
+
async fn sync_partition(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -577,12 +495,22 @@ where
}
}
-impl SyncTodo {
- fn add_full_sync<F: TableSchema, R: TableReplication>(
- &mut self,
- data: &TableData<F, R>,
- system: &System,
- ) {
+// -------- Sync Worker ---------
+
+struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+ syncer: Arc<TableSyncer<F, R>>,
+ ring_recv: watch::Receiver<Arc<Ring>>,
+ ring: Arc<Ring>,
+ add_full_sync_rx: mpsc::UnboundedReceiver<()>,
+ todo: Vec<TodoPartition>,
+ next_full_sync: Instant,
+}
+
+impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+ fn add_full_sync(&mut self) {
+ let system = &self.syncer.system;
+ let data = &self.syncer.data;
+
let my_id = system.id;
self.todo.clear();
@@ -623,6 +551,8 @@ impl SyncTodo {
retain,
});
}
+
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@@ -641,6 +571,62 @@ impl SyncTodo {
}
}
+#[async_trait]
+impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} sync", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.todo.len();
+ if l > 0 {
+ Some(format!("{} partitions remaining", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some(partition) = self.pop_task() {
+ self.syncer.sync_partition(&partition, must_exit).await?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ select! {
+ s = self.add_full_sync_rx.recv() => {
+ if let Some(()) = s {
+ self.add_full_sync();
+ }
+ },
+ _ = self.ring_recv.changed() => {
+ let new_ring = self.ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &self.ring) {
+ self.ring = new_ring.clone();
+ drop(new_ring);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ self.add_full_sync();
+ }
+ },
+ _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => {
+ self.add_full_sync();
+ }
+ }
+ match self.todo.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Idle,
+ }
+ }
+}
+
+// ---- UTIL ----
+
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 5d073436..57c70ffb 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
+async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
diff --git a/src/util/background.rs b/src/util/background.rs
deleted file mode 100644
index d35425f5..00000000
--- a/src/util/background.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! Job runner for futures and async functions
-use core::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::*;
-use futures::select;
-use futures::stream::FuturesUnordered;
-use futures::StreamExt;
-use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
-
-use crate::error::Error;
-
-type JobOutput = Result<(), Error>;
-type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
-/// Job runner for futures and async functions
-pub struct BackgroundRunner {
- stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
-}
-
-impl BackgroundRunner {
- /// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
- let (worker_in, mut worker_out) = mpsc::unbounded_channel();
-
- let stop_signal_2 = stop_signal.clone();
- let await_all_done = tokio::spawn(async move {
- let mut workers = FuturesUnordered::new();
- let mut shutdown_timer = 0;
- loop {
- let closed = match worker_out.try_recv() {
- Ok(wkr) => {
- workers.push(wkr);
- false
- }
- Err(TryRecvError::Empty) => false,
- Err(TryRecvError::Disconnected) => true,
- };
- select! {
- res = workers.next() => {
- if let Some(Err(e)) = res {
- error!("Worker exited with error: {}", e);
- }
- }
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if closed || *stop_signal_2.borrow() {
- shutdown_timer += 1;
- if shutdown_timer >= 10 {
- break;
- }
- }
- }
- }
- }
- });
-
- let (queue_in, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
- let stop_signal = stop_signal.clone();
-
- worker_in
- .send(tokio::spawn(async move {
- loop {
- let (job, cancellable) = {
- select! {
- item = wait_job(&queue_out).fuse() => match item {
- // We received a task, process it
- Some(x) => x,
- // We received a signal that no more tasks will ever be sent
- // because the sending side was dropped. Exit now.
- None => break,
- },
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal.borrow() {
- // Nothing has been going on for 5 secs, and we are shutting
- // down. Exit now.
- break;
- } else {
- // Nothing is going on but we don't want to exit.
- continue;
- }
- }
- }
- };
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- }
- info!("Background worker {} exiting", i);
- }))
- .unwrap();
- }
-
- let bgrunner = Arc::new(Self {
- stop_signal,
- queue_in,
- worker_in,
- });
- (bgrunner, await_all_done)
- }
-
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, false))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, true))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- pub fn spawn_worker<F, T>(&self, name: String, worker: F)
- where
- F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = ()> + Send + 'static,
- {
- let stop_signal = self.stop_signal.clone();
- let task = tokio::spawn(async move {
- info!("Worker started: {}", name);
- worker(stop_signal).await;
- info!("Worker exited: {}", name);
- });
- self.worker_in
- .send(task)
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-}
-
-async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
- q.lock().await.recv().await
-}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..2568ea11
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,48 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerState::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerState::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerState::Busy;
+ }
+ None => return WorkerState::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..619f5068
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,117 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerState};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub struct WorkerInfo {
+ pub name: String,
+ pub info: Option<String>,
+ pub state: WorkerState,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<(String, u64)>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let mut worker_processor =
+ WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
+
+ let await_all_done = tokio::spawn(async move {
+ worker_processor.run().await;
+ });
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ worker_info,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
+ self.worker_info.lock().unwrap().clone()
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..7f573a07
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,261 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use futures::future::*;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::{mpsc, watch};
+use tracing::*;
+
+use crate::background::WorkerInfo;
+use crate::error::Error;
+use crate::time::now_msec;
+
+#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+pub enum WorkerState {
+ Busy,
+ Throttled(f32),
+ Idle,
+ Done,
+}
+
+impl std::fmt::Display for WorkerState {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ WorkerState::Busy => write!(f, "Busy"),
+ WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerState::Idle => write!(f, "Idle"),
+ WorkerState::Done => write!(f, "Done"),
+ }
+ }
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+
+ fn info(&self) -> Option<String> {
+ None
+ }
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again after a short delay.
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ worker_info,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ stop_signal_worker,
+ worker: new_worker,
+ state: WorkerState::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
+ };
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
+
+ // Save worker info
+ let mut wi = self.worker_info.lock().unwrap();
+ match wi.get_mut(&worker.task_id) {
+ Some(i) => {
+ i.state = worker.state;
+ i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
+ }
+ None => {
+ wi.insert(worker.task_id, WorkerInfo {
+ name: worker.worker.name(),
+ state: worker.state,
+ info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
+ });
+ }
+ }
+
+ if worker.state == WorkerState::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.state == WorkerState::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
+ } else {
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited peacefully \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
+ }
+ }
+ }
+}
+
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ state: WorkerState,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<(String, u64)>,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.state {
+ WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.state = s;
+ self.consecutive_errors = 0;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some((format!("{}", e), now_msec()));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.state = WorkerState::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
+ }
+ },
+ WorkerState::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ if !*self.stop_signal.borrow() {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+ self.state = WorkerState::Busy;
+ }
+ WorkerState::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
+ }
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.state = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
+ }
+ }
+ }
+ WorkerState::Done => unreachable!(),
+ }
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 8ca6e310..fce151af 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
-//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 28711387..fdb2918b 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
+use crate::background::WorkerState;
+
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@@ -33,7 +35,7 @@ impl Tranquilizer {
}
}
- pub async fn tranquilize(&mut self, tranquility: u32) {
+ fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
@@ -45,13 +47,32 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ Some(delay)
+ } else {
+ None
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
+ self.reset();
}
+ }
- self.reset();
+ #[must_use]
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
+ match self.tranquilize_internal(tranquility) {
+ Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
+ None => WorkerState::Busy,
+ }
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
+
+ pub fn clear(&mut self) {
+ self.observations.clear();
+ }
}