aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-27 17:57:48 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-27 17:57:48 +0200
commit247dbcd5980e6a0158fe209d85788d3167dceab0 (patch)
treeb5f9b20cb87ead9d5e8cfb70984f9795ad2e7712 /src/block/manager.rs
parent0e5175abeeb1b2d9cfe27603005b7feb3cf040de (diff)
downloadgarage-247dbcd5980e6a0158fe209d85788d3167dceab0.tar.gz
garage-247dbcd5980e6a0158fe209d85788d3167dceab0.zip
Only one scrub worker (wip)
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs37
1 files changed, 33 insertions, 4 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 27f51ff8..015ac71b 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,6 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -10,7 +11,7 @@ use futures::future::*;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select;
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -35,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -86,6 +88,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
+ /// State store (only used by scrub worker to store time of last scrub)
+ pub(crate) state_variables_store: db::Tree,
compression_level: Option<i32>,
background_tranquility: u32,
@@ -102,6 +106,8 @@ pub struct BlockManager {
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -141,6 +147,10 @@ impl BlockManager {
let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
+ let state_variables_store = db
+ .open_tree("state_variables")
+ .expect("Unable to open state_variables tree");
+
let endpoint = system
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
@@ -159,13 +169,15 @@ impl BlockManager {
resync_queue,
resync_notify: Notify::new(),
resync_errors,
+ state_variables_store,
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -242,6 +254,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -475,11 +498,11 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
let worker = ResyncWorker {
- manager: self,
+ manager: self.clone(),
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
};
@@ -487,6 +510,12 @@ impl BlockManager {
tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4);
+ self.system.background.spawn_worker(scrub_worker);
}
pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {