aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/manager.rs37
-rw-r--r--src/block/repair.rs176
-rw-r--r--src/garage/admin.rs2
-rw-r--r--src/garage/cli/structs.rs25
-rw-r--r--src/garage/repair/online.rs22
-rw-r--r--src/util/tranquilizer.rs4
7 files changed, 239 insertions, 28 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 80346aca..2555a44a 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
+arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 27f51ff8..015ac71b 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,6 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -10,7 +11,7 @@ use futures::future::*;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select;
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -35,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -86,6 +88,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
+ /// State store (only used by scrub worker to store time of last scrub)
+ pub(crate) state_variables_store: db::Tree,
compression_level: Option<i32>,
background_tranquility: u32,
@@ -102,6 +106,8 @@ pub struct BlockManager {
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -141,6 +147,10 @@ impl BlockManager {
let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
+ let state_variables_store = db
+ .open_tree("state_variables")
+ .expect("Unable to open state_variables tree");
+
let endpoint = system
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
@@ -159,13 +169,15 @@ impl BlockManager {
resync_queue,
resync_notify: Notify::new(),
resync_errors,
+ state_variables_store,
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -242,6 +254,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -475,11 +498,11 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
let worker = ResyncWorker {
- manager: self,
+ manager: self.clone(),
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
};
@@ -487,6 +510,12 @@ impl BlockManager {
tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4);
+ self.system.background.spawn_worker(scrub_worker);
}
pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
diff --git a/src/block/repair.rs b/src/block/repair.rs
index a2a8443e..8335de51 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -1,20 +1,26 @@
use core::ops::Bound;
+use std::convert::TryInto;
use std::path::PathBuf;
-
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::fs;
+use tokio::select;
+use tokio::sync::mpsc;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
+const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub";
+
pub struct RepairWorker {
manager: Arc<BlockManager>,
next_start: Option<Hash>,
@@ -129,19 +135,107 @@ impl Worker for RepairWorker {
pub struct ScrubWorker {
manager: Arc<BlockManager>,
- iterator: BlockStoreIterator,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+
+ work: ScrubWorkerState,
tranquilizer: Tranquilizer,
tranquility: u32,
+
+ time_last_complete_scrub: u64,
+}
+
+enum ScrubWorkerState {
+ Running(BlockStoreIterator),
+ Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Finished,
+}
+
+impl Default for ScrubWorkerState {
+ fn default() -> Self {
+ ScrubWorkerState::Finished
+ }
+}
+
+pub enum ScrubWorkerCommand {
+ Start,
+ Pause(Duration),
+ Resume,
+ Cancel,
+ SetTranquility(u32),
}
impl ScrubWorker {
- pub fn new(manager: Arc<BlockManager>, tranquility: u32) -> Self {
- let iterator = BlockStoreIterator::new(&manager);
+ pub fn new(
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+ tranquility: u32,
+ ) -> Self {
+ let time_last_complete_scrub = match manager
+ .state_variables_store
+ .get(TIME_LAST_COMPLETE_SCRUB)
+ .expect("DB error when initializing scrub worker")
+ {
+ Some(v) => u64::from_be_bytes(v.try_into().unwrap()),
+ None => 0,
+ };
Self {
manager,
- iterator,
+ rx_cmd,
+ work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
tranquility,
+ time_last_complete_scrub,
+ }
+ }
+
+ fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
+ match cmd {
+ ScrubWorkerCommand::Start => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Finished => {
+ let iterator = BlockStoreIterator::new(&self.manager);
+ ScrubWorkerState::Running(iterator)
+ }
+ work => {
+ error!("Cannot start scrub worker: already running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Pause(dur) => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
+ ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ }
+ work => {
+ error!("Cannot pause scrub worker: not running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Resume => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ work => {
+ error!("Cannot resume scrub worker: not paused!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Cancel => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Finished
+ }
+ work => {
+ error!("Cannot cancel scrub worker: not running!");
+ work
+ }
+ }
+ }
+ ScrubWorkerCommand::SetTranquility(t) => {
+ self.tranquility = t;
+ }
}
}
}
@@ -153,24 +247,80 @@ impl Worker for ScrubWorker {
}
fn info(&self) -> Option<String> {
- Some(format!("{:.2}% done", self.iterator.progress() * 100.))
+ match &self.work {
+ ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)),
+ ScrubWorkerState::Paused(_bsi, rt) => {
+ Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt)))
+ }
+ ScrubWorkerState::Finished => Some(format!(
+ "Last completed scrub: {}",
+ msec_to_rfc3339(self.time_last_complete_scrub)
+ )),
+ }
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
- self.tranquilizer.reset();
- if let Some(hash) = self.iterator.next().await? {
- let _ = self.manager.read_block(&hash).await;
- Ok(self.tranquilizer.tranquilize_worker(self.tranquility))
- } else {
- Ok(WorkerStatus::Done)
+ match self.rx_cmd.try_recv() {
+ Ok(cmd) => self.handle_cmd(cmd),
+ Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done),
+ Err(mpsc::error::TryRecvError::Empty) => (),
+ };
+
+ match &mut self.work {
+ ScrubWorkerState::Running(bsi) => {
+ self.tranquilizer.reset();
+ if let Some(hash) = bsi.next().await? {
+ let _ = self.manager.read_block(&hash).await;
+ Ok(self.tranquilizer.tranquilize_worker(self.tranquility))
+ } else {
+ self.time_last_complete_scrub = now_msec(); // TODO save to file
+ self.manager.state_variables_store.insert(
+ TIME_LAST_COMPLETE_SCRUB,
+ u64::to_be_bytes(self.time_last_complete_scrub),
+ )?;
+ self.work = ScrubWorkerState::Finished;
+ self.tranquilizer.clear();
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ _ => Ok(WorkerStatus::Idle),
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
- unreachable!()
+ match &self.work {
+ ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
+ ScrubWorkerState::Paused(_, resume_time) => {
+ let delay = Duration::from_millis(resume_time - now_msec());
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume),
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd);
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ ScrubWorkerState::Finished => {
+ let delay = SCRUB_INTERVAL
+ - Duration::from_secs(now_msec() - self.time_last_complete_scrub);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start),
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd);
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ }
+ match &self.work {
+ ScrubWorkerState::Running(_) => WorkerStatus::Busy,
+ _ => WorkerStatus::Idle,
+ }
}
}
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index de49331e..71ee608c 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -698,7 +698,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt);
+ launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index c1ee32ab..bc44b5ef 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -427,8 +427,29 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
- /// Tranquility factor (see tranquilizer documentation)
- #[structopt(name = "tranquility", default_value = "2")]
+ #[structopt(subcommand)]
+ cmd: ScrubCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum ScrubCmd {
+ /// Start scrub
+ #[structopt(name = "start")]
+ Start,
+ /// Pause scrub (it will resume automatically after 24 hours)
+ #[structopt(name = "pause")]
+ Pause,
+ /// Resume paused scrub
+ #[structopt(name = "resume")]
+ Resume,
+ /// Cancel scrub in progress
+ #[structopt(name = "cancel")]
+ Cancel,
+ /// Set tranquility level for in-progress and future scrubs
+ #[structopt(name = "set-tranquility")]
+ SetTranquility {
+ #[structopt()]
tranquility: u32,
},
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index b0437c5e..8207a8b4 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -1,8 +1,10 @@
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
+use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
@@ -13,7 +15,7 @@ use garage_util::error::Error;
use crate::*;
-pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
@@ -43,14 +45,18 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
garage.block_manager.clone(),
));
}
- RepairWhat::Scrub { tranquility } => {
+ RepairWhat::Scrub { cmd } => {
info!("Verifying integrity of stored blocks");
- garage
- .background
- .spawn_worker(garage_block::repair::ScrubWorker::new(
- garage.block_manager.clone(),
- tranquility,
- ));
+ let cmd = match cmd {
+ ScrubCmd::Start => ScrubWorkerCommand::Start,
+ ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
+ ScrubCmd::Resume => ScrubWorkerCommand::Resume,
+ ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
+ ScrubCmd::SetTranquility { tranquility } => {
+ ScrubWorkerCommand::SetTranquility(tranquility)
+ }
+ };
+ garage.block_manager.send_scrub_command(cmd).await;
}
}
}
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index f0c2b410..9c796f8b 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -71,4 +71,8 @@ impl Tranquilizer {
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
+
+ pub fn clear(&mut self) {
+ self.observations.clear();
+ }
}