aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs30
-rw-r--r--src/block/repair.rs4
-rw-r--r--src/block/resync.rs2
3 files changed, 21 insertions, 15 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 28523a93..1b5a5df0 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -22,6 +23,7 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -87,7 +89,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
- tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@@ -126,8 +128,6 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
-
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -138,21 +138,24 @@ impl BlockManager {
system,
endpoint,
metrics,
- tx_scrub_command: scrub_tx,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager
+ }
+
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
// Spawn a bunch of resync workers
for index in 0..MAX_RESYNC_WORKERS {
- let worker = ResyncWorker::new(index, block_manager.clone());
- block_manager.system.background.spawn_worker(worker);
+ let worker = ResyncWorker::new(index, self.clone());
+ bg.spawn_worker(worker);
}
// Spawn scrub worker
- let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);
- block_manager.system.background.spawn_worker(scrub_worker);
-
- block_manager
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -325,8 +328,11 @@ impl BlockManager {
}
/// Send command to start/stop/manager scrub worker
- pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
- let _ = self.tx_scrub_command.send(cmd).await;
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
+ let tx = self.tx_scrub_command.load();
+ let tx = tx.as_ref().ok_or_message("scrub worker is not running")?;
+ tx.send(cmd).await.ok_or_message("send error")?;
+ Ok(())
}
/// Get the reference count of a block
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 1878027e..f5515d4e 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -148,7 +148,7 @@ impl Worker for RepairWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
@@ -341,7 +341,7 @@ impl Worker for ScrubWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 8231b55d..51bb9846 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -540,7 +540,7 @@ impl Worker for ResyncWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers {
self.manager.resync.notify.notified().await
}