diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 135 |
1 files changed, 120 insertions, 15 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 7f439b96..051a9f93 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,8 +3,10 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use futures::Stream; @@ -22,9 +24,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; +use garage_util::background::{vars, BackgroundRunner}; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::persister::PersisterShared; +use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; @@ -87,7 +92,17 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, - tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>, + pub scrub_persister: PersisterShared<ScrubWorkerPersisted>, + tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct BlockResyncErrorInfo { + pub hash: Hash, + pub refcount: u64, + pub error_count: u64, + pub last_try: u64, + pub next_try: u64, } // This custom struct contains functions that must only be ran @@ -114,9 +129,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); + let metrics = BlockManagerMetrics::new( + compression_level, + rc.rc.clone(), + resync.queue.clone(), + resync.errors.clone(), + ); - let (scrub_tx, scrub_rx) = mpsc::channel(1); + let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let block_manager = Arc::new(Self { replication, @@ -128,21 +148,46 @@ impl BlockManager { system, endpoint, metrics, - tx_scrub_command: scrub_tx, + scrub_persister, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager + } + + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { - let worker = ResyncWorker::new(index, block_manager.clone()); - block_manager.system.background.spawn_worker(worker); + let worker = ResyncWorker::new(index, self.clone()); + bg.spawn_worker(worker); } // Spawn scrub worker - let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); - block_manager.system.background.spawn_worker(scrub_worker); + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } - block_manager + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + self.resync.register_bg_vars(vars); + + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); } /// Ask nodes that might have a (possibly compressed) block for it @@ -309,9 +354,42 @@ impl BlockManager { Ok(self.rc.rc.len()?) } + /// Get number of items in the refcount table + pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> { + Ok(self.rc.rc.fast_len()?) + } + /// Send command to start/stop/manager scrub worker - pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { - let _ = self.tx_scrub_command.send(cmd).await; + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> { + let tx = self.tx_scrub_command.load(); + let tx = tx.as_ref().ok_or_message("scrub worker is not running")?; + tx.send(cmd).await.ok_or_message("send error")?; + Ok(()) + } + + /// Get the reference count of a block + pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> { + Ok(self.rc.get_block_rc(hash)?.as_u64()) + } + + /// List all resync errors + pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> { + let mut blocks = Vec::with_capacity(self.resync.errors.len()); + for ent in self.resync.errors.iter()? { + let (hash, cnt) = ent?; + let cnt = ErrorCounter::decode(&cnt); + blocks.push(BlockResyncErrorInfo { + hash: Hash::try_from(&hash).unwrap(), + refcount: 0, + error_count: cnt.errors, + last_try: cnt.last_try, + next_try: cnt.next_try(), + }); + } + for block in blocks.iter_mut() { + block.refcount = self.get_block_rc(&block.hash)?; + } + Ok(blocks) } //// ----- Managing the reference counter ---- @@ -603,14 +681,21 @@ impl BlockManagerLocked { } }; - let mut path2 = path.clone(); - path2.set_extension("tmp"); - let mut f = fs::File::create(&path2).await?; + let mut path_tmp = path.clone(); + let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); + path_tmp.set_extension(tmp_extension); + + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); + + let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; f.sync_all().await?; drop(f); - fs::rename(path2, path).await?; + fs::rename(path_tmp, path).await?; + + delete_on_drop.cancel(); + if let Some(to_delete) = to_delete { fs::remove_file(to_delete).await?; } @@ -676,3 +761,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { .concat() .into()) } + +struct DeleteOnDrop(Option<PathBuf>); + +impl DeleteOnDrop { + fn cancel(&mut self) { + drop(self.0.take()); + } +} + +impl Drop for DeleteOnDrop { + fn drop(&mut self) { + if let Some(path) = self.0.take() { + tokio::spawn(async move { + if let Err(e) = fs::remove_file(&path).await { + debug!("DeleteOnDrop failed for {}: {}", path.display(), e); + } + }); + } + } +} |