From 71c0188055e25aa1c00d0226f0ca99ce323310a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 4 Sep 2023 14:49:49 +0200 Subject: block manager: skeleton for multi-hdd support --- src/block/repair.rs | 217 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 142 insertions(+), 75 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index 71093d69..d5e2e168 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -136,7 +137,7 @@ impl Worker for RepairWorker { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. - if let Some(hash) = bi.next().await? { + if let Some((_path, hash)) = bi.next().await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; @@ -376,7 +377,7 @@ impl Worker for ScrubWorker { match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); - if let Some(hash) = bsi.next().await? { + if let Some((_path, hash)) = bsi.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -447,100 +448,166 @@ impl Worker for ScrubWorker { // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- +const PROGRESS_FP: u64 = 1_000_000_000; + struct BlockStoreIterator { - path: Vec, + todo: Vec, } -enum ReadingDir { - Pending(PathBuf), - Read { - subpaths: Vec, - pos: usize, +enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + filename: String, + progress: u64, }, } impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let root_dir = manager.data_dir.clone(); - Self { - path: vec![ReadingDir::Pending(root_dir)], + let min_cap = manager + .data_layout + .data_dirs + .iter() + .filter_map(|x| match x.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + }) + .min() + .unwrap_or(0); + + let sum_cap = manager + .data_layout + .data_dirs + .iter() + .map(|x| match x.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, // approximation + }) + .sum::() as u128; + + let mut cum_cap = 0; + let mut todo = vec![]; + for dir in manager.data_layout.data_dirs.iter() { + let cap = match dir.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, + }; + + let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + let progress_max = + (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + cum_cap += cap; + + todo.push(BsiTodo::Directory { + path: dir.path.clone(), + progress_min, + progress_max, + }); } + // entries are processed back-to-front (because of .pop()), + // so reverse entries to process them in increasing progress bounds + todo.reverse(); + + let ret = Self { todo }; + debug_assert!(ret.progress_invariant()); + + ret } /// Returns progress done, between 0 and 1 fn progress(&self) -> f32 { - if self.path.is_empty() { - 1.0 - } else { - let mut ret = 0.0; - let mut next_div = 1; - for p in self.path.iter() { - match p { - ReadingDir::Pending(_) => break, - ReadingDir::Read { subpaths, pos } => { - next_div *= subpaths.len(); - ret += ((*pos - 1) as f32) / (next_div as f32); - } - } - } - ret - } + self.todo + .last() + .map(|x| match x { + BsiTodo::Directory { progress_min, .. } => *progress_min, + BsiTodo::File { progress, .. } => *progress, + }) + .map(|x| x as f32 / PROGRESS_FP as f32) + .unwrap_or(1.0) } - async fn next(&mut self) -> Result, Error> { + async fn next(&mut self) -> Result, Error> { loop { - let last_path = match self.path.last_mut() { + match self.todo.pop() { None => return Ok(None), - Some(lp) => lp, - }; - - if let ReadingDir::Pending(path) = last_path { - let mut reader = fs::read_dir(&path).await?; - let mut subpaths = vec![]; - while let Some(ent) = reader.next_entry().await? { - subpaths.push(ent); - } - *last_path = ReadingDir::Read { subpaths, pos: 0 }; - } - - let (subpaths, pos) = match *last_path { - ReadingDir::Read { - ref subpaths, - ref mut pos, - } => (subpaths, pos), - ReadingDir::Pending(_) => unreachable!(), - }; + Some(BsiTodo::Directory { + path, + progress_min, + progress_max, + }) => { + let istart = self.todo.len(); + + let mut reader = fs::read_dir(&path).await?; + while let Some(ent) = reader.next_entry().await? { + let name = if let Ok(n) = ent.file_name().into_string() { + n + } else { + continue; + }; + let ft = ent.file_type().await?; + if ft.is_dir() && hex::decode(&name).is_ok() { + self.todo.push(BsiTodo::Directory { + path: ent.path(), + progress_min: 0, + progress_max: 0, + }); + } else if ft.is_file() { + self.todo.push(BsiTodo::File { + path: ent.path(), + filename: name, + progress: 0, + }); + } + } - let data_dir_ent = match subpaths.get(*pos) { - None => { - self.path.pop(); - continue; - } - Some(ent) => { - *pos += 1; - ent + let count = self.todo.len() - istart; + for (i, ent) in self.todo[istart..].iter_mut().enumerate() { + let p1 = progress_min + + ((progress_max - progress_min) * i as u64) / count as u64; + let p2 = progress_min + + ((progress_max - progress_min) * (i + 1) as u64) / count as u64; + match ent { + BsiTodo::Directory { + progress_min, + progress_max, + .. + } => { + *progress_min = p1; + *progress_max = p2; + } + BsiTodo::File { progress, .. } => { + *progress = p1; + } + } + } + self.todo[istart..].reverse(); + debug_assert!(self.progress_invariant()); } - }; - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { - let path = data_dir_ent.path(); - self.path.push(ReadingDir::Pending(path)); - } else if name.len() == 64 { - if let Ok(h) = hex::decode(name) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some(hash.into())); + Some(BsiTodo::File { path, filename, .. }) => { + let filename = filename.strip_suffix(".zst").unwrap_or(&filename); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + return Ok(Some((path, hash.into()))); + } + } } } } } + + fn progress_invariant(&self) -> bool { + let iter = self.todo.iter().map(|x| match x { + BsiTodo::Directory { progress_min, .. } => progress_min, + BsiTodo::File { progress, .. } => progress, + }); + let iter_1 = iter.clone().skip(1); + iter.zip(iter_1).all(|(prev, next)| prev >= next) + } } -- cgit v1.2.3 From 6c420c0880de742b2b6416da1178df828fd977bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 13:43:38 +0200 Subject: block manager: multi-directory layout computation --- src/block/repair.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index d5e2e168..0e7fe0df 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -473,10 +473,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .filter_map(|x| match x.state { - DataDirState::Active { capacity } => Some(capacity), - _ => None, - }) + .filter_map(|x| x.capacity()) .min() .unwrap_or(0); @@ -484,10 +481,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .map(|x| match x.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, // approximation - }) + .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) .sum::() as u128; let mut cum_cap = 0; -- cgit v1.2.3 From e30865984a5f23f046396ca192c1930314b50115 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 17:37:45 +0200 Subject: block manager: scrub checkpointing --- src/block/repair.rs | 158 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 108 insertions(+), 50 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index 0e7fe0df..a7c90d4f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -176,7 +176,9 @@ mod v081 { } mod v082 { + use garage_util::data::Hash; use serde::{Deserialize, Serialize}; + use std::path::PathBuf; use super::v081; @@ -186,6 +188,27 @@ mod v082 { pub(crate) time_last_complete_scrub: u64, pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, + #[serde(default)] + pub(crate) checkpoint: Option, + } + + #[derive(Serialize, Deserialize, Clone)] + pub struct BlockStoreIterator { + pub todo: Vec, + } + + #[derive(Serialize, Deserialize, Clone)] + pub enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + hash: Hash, + progress: u64, + }, } impl garage_util::migrate::Migrate for ScrubWorkerPersisted { @@ -200,6 +223,7 @@ mod v082 { time_last_complete_scrub: old.time_last_complete_scrub, time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, + checkpoint: None, } } } @@ -236,14 +260,23 @@ impl Default for ScrubWorkerPersisted { time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, + checkpoint: None, } } } #[derive(Default)] enum ScrubWorkerState { - Running(BlockStoreIterator), - Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Running { + iterator: BlockStoreIterator, + // time of the last checkpoint + t_cp: u64, + }, + Paused { + iterator: BlockStoreIterator, + // time at which the scrub should be resumed + t_resume: u64, + }, #[default] Finished, } @@ -262,10 +295,17 @@ impl ScrubWorker { rx_cmd: mpsc::Receiver, persister: PersisterShared, ) -> Self { + let work = match persister.get_with(|x| x.checkpoint.clone()) { + None => ScrubWorkerState::Finished, + Some(iterator) => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, + }; Self { manager, rx_cmd, - work: ScrubWorkerState::Finished, + work, tranquilizer: Tranquilizer::new(30), persister, } @@ -278,7 +318,16 @@ impl ScrubWorker { ScrubWorkerState::Finished => { info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); - ScrubWorkerState::Running(iterator) + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + } } work => { error!("Cannot start scrub worker: already running!"); @@ -288,8 +337,18 @@ impl ScrubWorker { } ScrubWorkerCommand::Pause(dur) => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { - ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + ScrubWorkerState::Running { iterator, .. } + | ScrubWorkerState::Paused { iterator, .. } => { + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Paused { + iterator, + t_resume: now_msec() + dur.as_millis() as u64, + } } work => { error!("Cannot pause scrub worker: not running!"); @@ -299,7 +358,10 @@ impl ScrubWorker { } ScrubWorkerCommand::Resume => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, work => { error!("Cannot resume scrub worker: not paused!"); work @@ -308,7 +370,7 @@ impl ScrubWorker { } ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { ScrubWorkerState::Finished } work => { @@ -344,12 +406,15 @@ impl Worker for ScrubWorker { ..Default::default() }; match &self.work { - ScrubWorkerState::Running(bsi) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + ScrubWorkerState::Running { iterator, .. } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); } - ScrubWorkerState::Paused(bsi, rt) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); - s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; + ScrubWorkerState::Paused { iterator, t_resume } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); + s.freeform = vec![format!( + "Scrub paused, resumes at {}", + msec_to_rfc3339(*t_resume) + )]; } ScrubWorkerState::Finished => { s.freeform = vec![ @@ -375,9 +440,11 @@ impl Worker for ScrubWorker { }; match &mut self.work { - ScrubWorkerState::Running(bsi) => { + ScrubWorkerState::Running { iterator, t_cp } => { self.tranquilizer.reset(); - if let Some((_path, hash)) = bsi.next().await? { + let now = now_msec(); + + if let Some((_path, hash)) = iterator.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -386,16 +453,23 @@ impl Worker for ScrubWorker { Err(e) => return Err(e), _ => (), }; + + if now - *t_cp > 60 * 1000 { + self.persister + .set_with(|p| p.checkpoint = Some(iterator.clone()))?; + *t_cp = now; + } + Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - let now = now_msec(); let next_scrub_timestamp = randomize_next_scrub_run_time(now); self.persister.set_with(|p| { p.time_last_complete_scrub = now; p.time_next_run_scrub = next_scrub_timestamp; + p.checkpoint = None; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); @@ -414,8 +488,8 @@ impl Worker for ScrubWorker { async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { - ScrubWorkerState::Running(_) => return WorkerState::Busy, - ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), + ScrubWorkerState::Running { .. } => return WorkerState::Busy, + ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, @@ -438,7 +512,7 @@ impl Worker for ScrubWorker { } match &self.work { - ScrubWorkerState::Running(_) => WorkerState::Busy, + ScrubWorkerState::Running { .. } => WorkerState::Busy, _ => WorkerState::Idle, } } @@ -450,23 +524,6 @@ impl Worker for ScrubWorker { const PROGRESS_FP: u64 = 1_000_000_000; -struct BlockStoreIterator { - todo: Vec, -} - -enum BsiTodo { - Directory { - path: PathBuf, - progress_min: u64, - progress_max: u64, - }, - File { - path: PathBuf, - filename: String, - progress: u64, - }, -} - impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let min_cap = manager @@ -551,11 +608,18 @@ impl BlockStoreIterator { progress_max: 0, }); } else if ft.is_file() { - self.todo.push(BsiTodo::File { - path: ent.path(), - filename: name, - progress: 0, - }); + let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + self.todo.push(BsiTodo::File { + path: ent.path(), + hash: hash.into(), + progress: 0, + }); + } + } } } @@ -582,20 +646,14 @@ impl BlockStoreIterator { self.todo[istart..].reverse(); debug_assert!(self.progress_invariant()); } - Some(BsiTodo::File { path, filename, .. }) => { - let filename = filename.strip_suffix(".zst").unwrap_or(&filename); - if filename.len() == 64 { - if let Ok(h) = hex::decode(filename) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some((path, hash.into()))); - } - } + Some(BsiTodo::File { path, hash, .. }) => { + return Ok(Some((path, hash))); } } } } + // for debug_assert! fn progress_invariant(&self) -> bool { let iter = self.todo.iter().map(|x| match x { BsiTodo::Directory { progress_min, .. } => progress_min, -- cgit v1.2.3 From 6b008b5bd3843bb236f94a1b4472de11f5755f04 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:44:11 +0200 Subject: block manager: add rebalance operation to rebalance multi-hdd setups --- src/block/repair.rs | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 5 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index a7c90d4f..1bea9f09 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -518,6 +518,86 @@ impl Worker for ScrubWorker { } } +// ---- ---- ---- +// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS +// between multiple storage locations. +// This is a one-shot repair operation that can be launched, +// checks everything, and then exits. +// ---- ---- ---- + +pub struct RebalanceWorker { + manager: Arc, + block_iter: BlockStoreIterator, + moved: usize, + moved_bytes: usize, +} + +impl RebalanceWorker { + pub fn new(manager: Arc) -> Self { + let block_iter = BlockStoreIterator::new(&manager); + Self { + manager, + block_iter, + moved: 0, + moved_bytes: 0, + } + } +} + +#[async_trait] +impl Worker for RebalanceWorker { + fn name(&self) -> String { + "Block rebalance worker".into() + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), + freeform: vec![ + format!("Blocks moved: {}", self.moved), + format!("Bytes moved: {}", self.moved_bytes), + ], + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + if let Some((path, hash)) = self.block_iter.next().await? { + let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); + if path.parent().expect("no parent?") != prim_loc { + // block is not in its primary location, + // move it there (reading and re-writing does the trick) + let data = self.manager.read_block(&hash).await?; + self.manager.write_block(&hash, &data).await?; + self.moved += 1; + self.moved_bytes += data.inner_buffer().len(); + } + Ok(WorkerState::Busy) + } else { + // all blocks are in their primary location: + // - the ones we moved now are + // - the ones written in the meantime always were, because we only + // write to primary locations + // so we can safely remove all secondary locations from the data layout + let new_layout = self + .manager + .data_layout + .load_full() + .without_secondary_locations(); + self.manager + .data_layout_persister + .save_async(&new_layout) + .await?; + self.manager.data_layout.store(Arc::new(new_layout)); + Ok(WorkerState::Done) + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + // ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- @@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000; impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let min_cap = manager - .data_layout + let data_layout = manager.data_layout.load_full(); + + let min_cap = data_layout .data_dirs .iter() .filter_map(|x| x.capacity()) .min() .unwrap_or(0); - let sum_cap = manager - .data_layout + let sum_cap = data_layout .data_dirs .iter() .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) @@ -543,7 +623,7 @@ impl BlockStoreIterator { let mut cum_cap = 0; let mut todo = vec![]; - for dir in manager.data_layout.data_dirs.iter() { + for dir in data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, -- cgit v1.2.3 From 2657b5c1b911b7c5f2d97f8c564e60202ddf4124 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 15:30:56 +0200 Subject: block manager: fix bugs --- src/block/repair.rs | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index 1bea9f09..e18eeaeb 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::block::*; use crate::layout::*; use crate::manager::*; @@ -528,8 +529,10 @@ impl Worker for ScrubWorker { pub struct RebalanceWorker { manager: Arc, block_iter: BlockStoreIterator, + t_started: u64, + t_finished: Option, moved: usize, - moved_bytes: usize, + moved_bytes: u64, } impl RebalanceWorker { @@ -538,6 +541,8 @@ impl RebalanceWorker { Self { manager, block_iter, + t_started: now_msec(), + t_finished: None, moved: 0, moved_bytes: 0, } @@ -551,11 +556,18 @@ impl Worker for RebalanceWorker { } fn status(&self) -> WorkerStatus { + let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); + let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), freeform: vec![ format!("Blocks moved: {}", self.moved), - format!("Bytes moved: {}", self.moved_bytes), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), ], ..Default::default() } @@ -565,12 +577,21 @@ impl Worker for RebalanceWorker { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.parent().expect("no parent?") != prim_loc { + let path = match path.extension() { + None => DataBlockPath::Plain(path), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + _ => { + warn!("not rebalancing file: {}", path.to_string_lossy()); + return Ok(WorkerState::Busy); + } + }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - let data = self.manager.read_block(&hash).await?; + debug!("rebalance: moving block {:?}", hash); + let data = self.manager.read_block_from(&hash, &path).await?; self.manager.write_block(&hash, &data).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len(); + self.moved_bytes += data.inner_buffer().len() as u64; } Ok(WorkerState::Busy) } else { @@ -589,6 +610,7 @@ impl Worker for RebalanceWorker { .save_async(&new_layout) .await?; self.manager.data_layout.store(Arc::new(new_layout)); + self.t_finished = Some(now_msec()); Ok(WorkerState::Done) } } -- cgit v1.2.3 From be91ef6294bcc699f075746fd3abb57a9b22e838 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 16:04:03 +0200 Subject: block manager: fix bug where rebalance didn't delete old copies --- src/block/repair.rs | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index e18eeaeb..bd14085f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -558,17 +558,21 @@ impl Worker for RebalanceWorker { fn status(&self) -> WorkerStatus { let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); + let mut freeform = vec![ + format!("Blocks moved: {}", self.moved), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), + ]; + if let Some(t_fin) = self.t_finished { + freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin))) + } WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), - freeform: vec![ - format!("Blocks moved: {}", self.moved), - format!( - "Bytes moved: {} ({}/s)", - bytesize::ByteSize::b(self.moved_bytes), - bytesize::ByteSize::b(rate) - ), - format!("Started: {}", msec_to_rfc3339(self.t_started)), - ], + freeform, ..Default::default() } } @@ -576,10 +580,10 @@ impl Worker for RebalanceWorker { async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); - if path.parent().expect("no parent?") != prim_loc { - let path = match path.extension() { - None => DataBlockPath::Plain(path), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + if path.ancestors().all(|x| x != prim_loc) { + let block_path = match path.extension() { + None => DataBlockPath::Plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); @@ -587,11 +591,10 @@ impl Worker for RebalanceWorker { }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - debug!("rebalance: moving block {:?}", hash); - let data = self.manager.read_block_from(&hash, &path).await?; - self.manager.write_block(&hash, &data).await?; + debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc); + let block_len = self.manager.fix_block_location(&hash, block_path).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len() as u64; + self.moved_bytes += block_len as u64; } Ok(WorkerState::Busy) } else { -- cgit v1.2.3 From 9526328d386ab6261df416327c2efb0791369339 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:10:48 +0200 Subject: scrub: clear saved checkpoint when canceling scrub --- src/block/repair.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index bd14085f..a464e2b6 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -372,6 +372,9 @@ impl ScrubWorker { ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { + if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) { + error!("Could not save scrub checkpoint: {}", e); + } ScrubWorkerState::Finished } work => { -- cgit v1.2.3 From ba7ac52c196c452e0b09fef63862264e0c4582bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:28:29 +0200 Subject: block repair: simpler/more robust iterator progress calculation --- src/block/repair.rs | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) (limited to 'src/block/repair.rs') diff --git a/src/block/repair.rs b/src/block/repair.rs index a464e2b6..77ee0d14 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -18,7 +18,6 @@ use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::block::*; -use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -636,31 +635,23 @@ impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let data_layout = manager.data_layout.load_full(); - let min_cap = data_layout - .data_dirs - .iter() - .filter_map(|x| x.capacity()) - .min() - .unwrap_or(0); - - let sum_cap = data_layout - .data_dirs - .iter() - .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) - .sum::() as u128; + let mut dir_cap = vec![0; data_layout.data_dirs.len()]; + for prim in data_layout.part_prim.iter() { + dir_cap[*prim as usize] += 1; + } + for sec_vec in data_layout.part_sec.iter() { + for sec in sec_vec.iter() { + dir_cap[*sec as usize] += 1; + } + } + let sum_cap = dir_cap.iter().sum::() as u64; let mut cum_cap = 0; let mut todo = vec![]; - for dir in data_layout.data_dirs.iter() { - let cap = match dir.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, - }; - - let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - let progress_max = - (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - cum_cap += cap; + for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) { + let progress_min = (cum_cap * PROGRESS_FP) / sum_cap; + let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap; + cum_cap += cap as u64; todo.push(BsiTodo::Directory { path: dir.path.clone(), -- cgit v1.2.3