aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs124
1 files changed, 73 insertions, 51 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 32ba0431..8a131270 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -9,9 +9,9 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
-use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::select;
use tokio::sync::{watch, Mutex, Notify};
use opentelemetry::{
@@ -22,6 +22,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -110,6 +111,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
+enum BlockIterResult {
+ BusyDidSomething,
+ BusyDidNothing,
+ IdleFor(Duration),
+}
+
impl BlockManager {
pub fn new(
db: &db::Db,
@@ -557,11 +564,14 @@ impl BlockManager {
fn spawn_background_worker(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
+ let worker = BlockResyncWorker {
+ manager: self,
+ tranquilizer: Tranquilizer::new(30),
+ next_delay: Duration::from_secs(10),
+ };
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
- background.spawn_worker("block resync worker".into(), move |must_exit| {
- self.resync_loop(must_exit)
- });
+ background.spawn_worker(worker);
});
}
@@ -579,37 +589,7 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- let mut tranquilizer = Tranquilizer::new(30);
-
- while !*must_exit.borrow() {
- match self.resync_iter(&mut must_exit).await {
- Ok(true) => {
- tranquilizer.tranquilize(self.background_tranquility).await;
- }
- Ok(false) => {
- tranquilizer.reset();
- }
- Err(e) => {
- // The errors that we have here are only Sled errors
- // We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
- // if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
- tranquilizer.reset();
- }
- }
- }
- }
-
- // The result of resync_iter is:
- // - Ok(true) -> a block was processed (successfully or not)
- // - Ok(false) -> no block was processed, but we are ready for the next iteration
- // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ async fn resync_iter(&self) -> Result<BlockIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -629,7 +609,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
- return Ok(false);
+ return Ok(BlockIterResult::BusyDidNothing);
}
}
@@ -676,15 +656,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
- Ok(true)
+ Ok(BlockIterResult::BusyDidSomething)
} else {
- let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(BlockIterResult::IdleFor(Duration::from_millis(
+ time_msec - now,
+ )))
}
} else {
// Here we wait either for a notification that an item has been
@@ -693,13 +669,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
- let delay = tokio::time::sleep(Duration::from_secs(10));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(BlockIterResult::IdleFor(Duration::from_secs(10)))
}
}
@@ -898,6 +868,58 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct BlockResyncWorker {
+ manager: Arc<BlockManager>,
+ tranquilizer: Tranquilizer,
+ next_delay: Duration,
+}
+
+#[async_trait]
+impl Worker for BlockResyncWorker {
+ fn name(&self) -> String {
+ "Block resync worker".into()
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.tranquilizer.reset();
+ match self.manager.resync_iter().await {
+ Ok(BlockIterResult::BusyDidSomething) => {
+ self.tranquilizer
+ .tranquilize(self.manager.background_tranquility)
+ .await;
+ Ok(WorkerStatus::Busy)
+ }
+ Ok(BlockIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
+ Ok(BlockIterResult::IdleFor(delay)) => {
+ self.next_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ error!(
+ "Could not do a resync iteration: {} (this is a very bad error)",
+ e
+ );
+ Err(e.into())
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ select! {
+ _ = tokio::time::sleep(self.next_delay) => (),
+ _ = self.manager.resync_notify.notified() => (),
+ };
+ WorkerStatus::Busy
+ }
+}
+
struct BlockStatus {
exists: bool,
needed: RcEntry,