aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs135
1 files changed, 120 insertions, 15 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 7f439b96..051a9f93 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,8 +3,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use futures::Stream;
@@ -22,9 +24,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
+use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::PersisterShared;
+use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
@@ -87,7 +92,17 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
- tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
+ pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
+}
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+pub struct BlockResyncErrorInfo {
+ pub hash: Hash,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try: u64,
+ pub next_try: u64,
}
// This custom struct contains functions that must only be ran
@@ -114,9 +129,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone());
+ let metrics = BlockManagerMetrics::new(
+ compression_level,
+ rc.rc.clone(),
+ resync.queue.clone(),
+ resync.errors.clone(),
+ );
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
@@ -128,21 +148,46 @@ impl BlockManager {
system,
endpoint,
metrics,
- tx_scrub_command: scrub_tx,
+ scrub_persister,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager
+ }
+
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
// Spawn a bunch of resync workers
for index in 0..MAX_RESYNC_WORKERS {
- let worker = ResyncWorker::new(index, block_manager.clone());
- block_manager.system.background.spawn_worker(worker);
+ let worker = ResyncWorker::new(index, self.clone());
+ bg.spawn_worker(worker);
}
// Spawn scrub worker
- let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);
- block_manager.system.background.spawn_worker(scrub_worker);
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
- block_manager
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ self.resync.register_bg_vars(vars);
+
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -309,9 +354,42 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Get number of items in the refcount table
+ pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
+ Ok(self.rc.rc.fast_len()?)
+ }
+
/// Send command to start/stop/manager scrub worker
- pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
- let _ = self.tx_scrub_command.send(cmd).await;
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
+ let tx = self.tx_scrub_command.load();
+ let tx = tx.as_ref().ok_or_message("scrub worker is not running")?;
+ tx.send(cmd).await.ok_or_message("send error")?;
+ Ok(())
+ }
+
+ /// Get the reference count of a block
+ pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
+ Ok(self.rc.get_block_rc(hash)?.as_u64())
+ }
+
+ /// List all resync errors
+ pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
+ let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ for ent in self.resync.errors.iter()? {
+ let (hash, cnt) = ent?;
+ let cnt = ErrorCounter::decode(&cnt);
+ blocks.push(BlockResyncErrorInfo {
+ hash: Hash::try_from(&hash).unwrap(),
+ refcount: 0,
+ error_count: cnt.errors,
+ last_try: cnt.last_try,
+ next_try: cnt.next_try(),
+ });
+ }
+ for block in blocks.iter_mut() {
+ block.refcount = self.get_block_rc(&block.hash)?;
+ }
+ Ok(blocks)
}
//// ----- Managing the reference counter ----
@@ -603,14 +681,21 @@ impl BlockManagerLocked {
}
};
- let mut path2 = path.clone();
- path2.set_extension("tmp");
- let mut f = fs::File::create(&path2).await?;
+ let mut path_tmp = path.clone();
+ let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
+ path_tmp.set_extension(tmp_extension);
+
+ let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
+
+ let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
f.sync_all().await?;
drop(f);
- fs::rename(path2, path).await?;
+ fs::rename(path_tmp, path).await?;
+
+ delete_on_drop.cancel();
+
if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?;
}
@@ -676,3 +761,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
.concat()
.into())
}
+
+struct DeleteOnDrop(Option<PathBuf>);
+
+impl DeleteOnDrop {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+
+impl Drop for DeleteOnDrop {
+ fn drop(&mut self) {
+ if let Some(path) = self.0.take() {
+ tokio::spawn(async move {
+ if let Err(e) = fs::remove_file(&path).await {
+ debug!("DeleteOnDrop failed for {}: {}", path.display(), e);
+ }
+ });
+ }
+ }
+}