aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs329
1 files changed, 118 insertions, 211 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 32ba0431..017ba9da 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,18 +1,17 @@
-use core::ops::Bound;
-
use std::convert::TryInto;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
-use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::select;
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -22,6 +21,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -36,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -93,16 +94,18 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: BlockRc,
+ pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: CountedTree,
- system: Arc<System>,
+ pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -110,6 +113,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
+enum ResyncIterResult {
+ BusyDidSomething,
+ BusyDidNothing,
+ IdleFor(Duration),
+}
+
impl BlockManager {
pub fn new(
db: &db::Db,
@@ -157,10 +166,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -218,90 +228,6 @@ impl BlockManager {
Ok(())
}
- /// Launch the repair procedure on the data store
- ///
- /// This will list all blocks locally present, as well as those
- /// that are required because of refcount > 0, and will try
- /// to fix any mismatch between the two.
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table.
- let mut next_start: Option<Hash> = None;
- loop {
- // We have to do this complicated two-step process where we first read a bunch
- // of hashes from the RC table, and then insert them in the to-resync queue,
- // because of SQLite. Basically, as long as we have an iterator on a DB table,
- // we can't do anything else on the DB. The naive approach (which we had previously)
- // of just iterating on the RC table and inserting items one to one in the resync
- // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
- // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
- // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
- // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
- let mut batch_of_hashes = vec![];
- let start_bound = match next_start.as_ref() {
- None => Bound::Unbounded,
- Some(x) => Bound::Excluded(x.as_slice()),
- };
- for entry in self
- .rc
- .rc
- .range::<&[u8], _>((start_bound, Bound::Unbounded))?
- {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- batch_of_hashes.push(hash);
- if batch_of_hashes.len() >= 1000 {
- break;
- }
- }
- if batch_of_hashes.is_empty() {
- break;
- }
-
- for hash in batch_of_hashes.into_iter() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- next_start = Some(hash)
- }
-
- if *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
- self.for_each_file(
- (),
- move |_, hash| async move {
- self.put_to_resync(&hash, Duration::from_secs(0))
- .map_err(Into::into)
- },
- must_exit,
- )
- .await
- }
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn scrub_data_store(
- &self,
- must_exit: &watch::Receiver<bool>,
- tranquility: u32,
- ) -> Result<(), Error> {
- let tranquilizer = Tranquilizer::new(30);
- self.for_each_file(
- tranquilizer,
- move |mut tranquilizer, hash| async move {
- let _ = self.read_block(&hash).await;
- tranquilizer.tranquilize(tranquility).await;
- Ok(tranquilizer)
- },
- must_exit,
- )
- .await
- }
-
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@@ -321,6 +247,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -390,7 +327,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -554,18 +491,23 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
+ let worker = ResyncWorker::new(self.clone());
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
- background.spawn_worker("block resync worker".into(), move |must_exit| {
- self.resync_loop(must_exit)
- });
+ background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
+ self.system.background.spawn_worker(scrub_worker);
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
+ pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@@ -579,37 +521,7 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- let mut tranquilizer = Tranquilizer::new(30);
-
- while !*must_exit.borrow() {
- match self.resync_iter(&mut must_exit).await {
- Ok(true) => {
- tranquilizer.tranquilize(self.background_tranquility).await;
- }
- Ok(false) => {
- tranquilizer.reset();
- }
- Err(e) => {
- // The errors that we have here are only Sled errors
- // We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
- // if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
- tranquilizer.reset();
- }
- }
- }
- }
-
- // The result of resync_iter is:
- // - Ok(true) -> a block was processed (successfully or not)
- // - Ok(false) -> no block was processed, but we are ready for the next iteration
- // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -629,7 +541,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
- return Ok(false);
+ return Ok(ResyncIterResult::BusyDidNothing);
}
}
@@ -676,15 +588,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
- Ok(true)
+ Ok(ResyncIterResult::BusyDidSomething)
} else {
- let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_millis(
+ time_msec - now,
+ )))
}
} else {
// Here we wait either for a notification that an item has been
@@ -693,13 +601,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
- let delay = tokio::time::sleep(Duration::from_secs(10));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
}
}
@@ -814,72 +716,6 @@ impl BlockManager {
Ok(())
}
-
- // ---- Utility: iteration on files in the data directory ----
-
- async fn for_each_file<F, Fut, State>(
- &self,
- state: State,
- mut f: F,
- must_exit: &watch::Receiver<bool>,
- ) -> Result<(), Error>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send,
- {
- self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
- .await
- .map(|_| ())
- }
-
- fn for_each_file_rec<'a, F, Fut, State>(
- &'a self,
- path: &'a Path,
- mut state: State,
- f: &'a mut F,
- must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<State, Error>>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send + 'a,
- {
- async move {
- let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
- if *must_exit.borrow() {
- break;
- }
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- state = self
- .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
- .await?;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- state = f(state, hash.into()).await?;
- }
- }
- Ok(state)
- }
- .boxed()
- }
}
#[async_trait]
@@ -898,6 +734,77 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct ResyncWorker {
+ manager: Arc<BlockManager>,
+ tranquilizer: Tranquilizer,
+ next_delay: Duration,
+}
+
+impl ResyncWorker {
+ fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ tranquilizer: Tranquilizer::new(30),
+ next_delay: Duration::from_secs(10),
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for ResyncWorker {
+ fn name(&self) -> String {
+ "Block resync worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let mut ret = vec![];
+ let qlen = self.manager.resync_queue_len().unwrap_or(0);
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
+ if qlen > 0 {
+ ret.push(format!("{} blocks in queue", qlen));
+ }
+ if elen > 0 {
+ ret.push(format!("{} blocks in error state", elen));
+ }
+ if !ret.is_empty() {
+ Some(ret.join(", "))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ self.tranquilizer.reset();
+ match self.manager.resync_iter().await {
+ Ok(ResyncIterResult::BusyDidSomething) => Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.manager.background_tranquility)),
+ Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
+ Ok(ResyncIterResult::IdleFor(delay)) => {
+ self.next_delay = delay;
+ Ok(WorkerState::Idle)
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ // Here we just give the error to the worker manager,
+ // it will print it to the logs and increment a counter
+ Err(e.into())
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(self.next_delay) => (),
+ _ = self.manager.resync_notify.notified() => (),
+ };
+ WorkerState::Busy
+ }
+}
+
struct BlockStatus {
exists: bool,
needed: RcEntry,