diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-27 17:57:48 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-27 17:57:48 +0200 |
commit | 247dbcd5980e6a0158fe209d85788d3167dceab0 (patch) | |
tree | b5f9b20cb87ead9d5e8cfb70984f9795ad2e7712 /src/block/manager.rs | |
parent | 0e5175abeeb1b2d9cfe27603005b7feb3cf040de (diff) | |
download | garage-247dbcd5980e6a0158fe209d85788d3167dceab0.tar.gz garage-247dbcd5980e6a0158fe209d85788d3167dceab0.zip |
Only one scrub worker (wip)
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 37 |
1 files changed, 33 insertions, 4 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 27f51ff8..015ac71b 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -10,7 +11,7 @@ use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; -use tokio::sync::{watch, Mutex, Notify}; +use tokio::sync::{mpsc, watch, Mutex, Notify}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -35,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; use crate::metrics::*; use crate::rc::*; +use crate::repair::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -86,6 +88,8 @@ pub struct BlockManager { pub replication: TableShardedReplication, /// Directory in which block are stored pub data_dir: PathBuf, + /// State store (only used by scrub worker to store time of last scrub) + pub(crate) state_variables_store: db::Tree, compression_level: Option<i32>, background_tranquility: u32, @@ -102,6 +106,8 @@ pub struct BlockManager { endpoint: Arc<Endpoint<BlockRpc, Self>>, metrics: BlockManagerMetrics, + + tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, } // This custom struct contains functions that must only be ran @@ -141,6 +147,10 @@ impl BlockManager { let resync_errors = CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); + let state_variables_store = db + .open_tree("state_variables") + .expect("Unable to open state_variables tree"); + let endpoint = system .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); @@ -159,13 +169,15 @@ impl BlockManager { resync_queue, resync_notify: Notify::new(), resync_errors, + state_variables_store, system, endpoint, metrics, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); - block_manager.clone().spawn_background_worker(); + block_manager.clone().spawn_background_workers(); block_manager } @@ -242,6 +254,17 @@ impl BlockManager { Ok(self.rc.rc.len()?) } + /// Send command to start/stop/manager scrub worker + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { + let _ = self + .tx_scrub_command + .load() + .as_ref() + .unwrap() + .send(cmd) + .await; + } + //// ----- Managing the reference counter ---- /// Increment the number of time a block is used, putting it to resynchronization if it is @@ -475,11 +498,11 @@ impl BlockManager { // for times that are earlier than the exponential back-off delay // is a natural condition that is handled properly). - fn spawn_background_worker(self: Arc<Self>) { + fn spawn_background_workers(self: Arc<Self>) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); let worker = ResyncWorker { - manager: self, + manager: self.clone(), tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), }; @@ -487,6 +510,12 @@ impl BlockManager { tokio::time::sleep(Duration::from_secs(10)).await; background.spawn_worker(worker); }); + + // Launch a background worker for data store scrubs + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4); + self.system.background.spawn_worker(scrub_worker); } pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { |