From 71c0188055e25aa1c00d0226f0ca99ce323310a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 4 Sep 2023 14:49:49 +0200 Subject: block manager: skeleton for multi-hdd support --- src/block/layout.rs | 57 ++++++++++++++ src/block/lib.rs | 1 + src/block/manager.rs | 29 +++++-- src/block/repair.rs | 217 +++++++++++++++++++++++++++++++++------------------ 4 files changed, 221 insertions(+), 83 deletions(-) create mode 100644 src/block/layout.rs (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs new file mode 100644 index 00000000..cbc326d8 --- /dev/null +++ b/src/block/layout.rs @@ -0,0 +1,57 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use garage_util::config::DataDirEnum; +use garage_util::data::Hash; +use garage_util::migrate::*; + +pub const DRIVE_NPART: usize = 1024; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataLayout { + pub(crate) data_dirs: Vec, + pub(crate) partitions: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataDir { + pub(crate) path: PathBuf, + pub(crate) state: DataDirState, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) enum DataDirState { + Active { capacity: u64 }, + ReadOnly, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct Partition { + pub(crate) prim: usize, + pub(crate) sec: Vec, +} + +impl DataLayout { + pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { + todo!() + } + + pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { + todo!() + /* + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + */ + } +} + +impl InitialFormat for DataLayout { + const VERSION_MARKER: &'static [u8] = b"G09bmdl"; +} diff --git a/src/block/lib.rs b/src/block/lib.rs index d2814f77..c9ff2845 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -6,5 +6,6 @@ pub mod repair; pub mod resync; mod block; +mod layout; mod metrics; mod rc; diff --git a/src/block/manager.rs b/src/block/manager.rs index c7e4cd03..18a2686e 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; use garage_util::background::{vars, BackgroundRunner}; +use garage_util::config::DataDirEnum; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::PersisterShared; +use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; @@ -38,6 +39,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; +use crate::layout::*; use crate::metrics::*; use crate::rc::*; use crate::repair::*; @@ -77,8 +79,11 @@ impl Rpc for BlockRpc { pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory in which block are stored - pub data_dir: PathBuf, + + /// Directory/ies in which block are stored + pub data_dir: DataDirEnum, + /// Data layout + pub(crate) data_layout: DataLayout, data_fsync: bool, compression_level: Option, @@ -114,12 +119,22 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( db: &db::Db, - data_dir: PathBuf, + data_dir: DataDirEnum, data_fsync: bool, compression_level: Option, replication: TableShardedReplication, system: Arc, ) -> Arc { + let layout_persister: Persister = + Persister::new(&system.metadata_dir, "data_layout"); + let data_layout = match layout_persister.load() { + Ok(mut layout) => { + layout.update(&data_dir); + layout + } + Err(_) => DataLayout::initialize(&data_dir), + }; + let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -143,6 +158,7 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, + data_layout, data_fsync, compression_level, mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), @@ -586,10 +602,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path + self.data_layout.data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index 71093d69..d5e2e168 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -136,7 +137,7 @@ impl Worker for RepairWorker { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. - if let Some(hash) = bi.next().await? { + if let Some((_path, hash)) = bi.next().await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; @@ -376,7 +377,7 @@ impl Worker for ScrubWorker { match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); - if let Some(hash) = bsi.next().await? { + if let Some((_path, hash)) = bsi.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -447,100 +448,166 @@ impl Worker for ScrubWorker { // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- +const PROGRESS_FP: u64 = 1_000_000_000; + struct BlockStoreIterator { - path: Vec, + todo: Vec, } -enum ReadingDir { - Pending(PathBuf), - Read { - subpaths: Vec, - pos: usize, +enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + filename: String, + progress: u64, }, } impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let root_dir = manager.data_dir.clone(); - Self { - path: vec![ReadingDir::Pending(root_dir)], + let min_cap = manager + .data_layout + .data_dirs + .iter() + .filter_map(|x| match x.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + }) + .min() + .unwrap_or(0); + + let sum_cap = manager + .data_layout + .data_dirs + .iter() + .map(|x| match x.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, // approximation + }) + .sum::() as u128; + + let mut cum_cap = 0; + let mut todo = vec![]; + for dir in manager.data_layout.data_dirs.iter() { + let cap = match dir.state { + DataDirState::Active { capacity } => capacity, + _ => min_cap, + }; + + let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + let progress_max = + (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; + cum_cap += cap; + + todo.push(BsiTodo::Directory { + path: dir.path.clone(), + progress_min, + progress_max, + }); } + // entries are processed back-to-front (because of .pop()), + // so reverse entries to process them in increasing progress bounds + todo.reverse(); + + let ret = Self { todo }; + debug_assert!(ret.progress_invariant()); + + ret } /// Returns progress done, between 0 and 1 fn progress(&self) -> f32 { - if self.path.is_empty() { - 1.0 - } else { - let mut ret = 0.0; - let mut next_div = 1; - for p in self.path.iter() { - match p { - ReadingDir::Pending(_) => break, - ReadingDir::Read { subpaths, pos } => { - next_div *= subpaths.len(); - ret += ((*pos - 1) as f32) / (next_div as f32); - } - } - } - ret - } + self.todo + .last() + .map(|x| match x { + BsiTodo::Directory { progress_min, .. } => *progress_min, + BsiTodo::File { progress, .. } => *progress, + }) + .map(|x| x as f32 / PROGRESS_FP as f32) + .unwrap_or(1.0) } - async fn next(&mut self) -> Result, Error> { + async fn next(&mut self) -> Result, Error> { loop { - let last_path = match self.path.last_mut() { + match self.todo.pop() { None => return Ok(None), - Some(lp) => lp, - }; - - if let ReadingDir::Pending(path) = last_path { - let mut reader = fs::read_dir(&path).await?; - let mut subpaths = vec![]; - while let Some(ent) = reader.next_entry().await? { - subpaths.push(ent); - } - *last_path = ReadingDir::Read { subpaths, pos: 0 }; - } - - let (subpaths, pos) = match *last_path { - ReadingDir::Read { - ref subpaths, - ref mut pos, - } => (subpaths, pos), - ReadingDir::Pending(_) => unreachable!(), - }; + Some(BsiTodo::Directory { + path, + progress_min, + progress_max, + }) => { + let istart = self.todo.len(); + + let mut reader = fs::read_dir(&path).await?; + while let Some(ent) = reader.next_entry().await? { + let name = if let Ok(n) = ent.file_name().into_string() { + n + } else { + continue; + }; + let ft = ent.file_type().await?; + if ft.is_dir() && hex::decode(&name).is_ok() { + self.todo.push(BsiTodo::Directory { + path: ent.path(), + progress_min: 0, + progress_max: 0, + }); + } else if ft.is_file() { + self.todo.push(BsiTodo::File { + path: ent.path(), + filename: name, + progress: 0, + }); + } + } - let data_dir_ent = match subpaths.get(*pos) { - None => { - self.path.pop(); - continue; - } - Some(ent) => { - *pos += 1; - ent + let count = self.todo.len() - istart; + for (i, ent) in self.todo[istart..].iter_mut().enumerate() { + let p1 = progress_min + + ((progress_max - progress_min) * i as u64) / count as u64; + let p2 = progress_min + + ((progress_max - progress_min) * (i + 1) as u64) / count as u64; + match ent { + BsiTodo::Directory { + progress_min, + progress_max, + .. + } => { + *progress_min = p1; + *progress_max = p2; + } + BsiTodo::File { progress, .. } => { + *progress = p1; + } + } + } + self.todo[istart..].reverse(); + debug_assert!(self.progress_invariant()); } - }; - - let name = data_dir_ent.file_name(); - let name = if let Ok(n) = name.into_string() { - n - } else { - continue; - }; - let ent_type = data_dir_ent.file_type().await?; - - let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { - let path = data_dir_ent.path(); - self.path.push(ReadingDir::Pending(path)); - } else if name.len() == 64 { - if let Ok(h) = hex::decode(name) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some(hash.into())); + Some(BsiTodo::File { path, filename, .. }) => { + let filename = filename.strip_suffix(".zst").unwrap_or(&filename); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + return Ok(Some((path, hash.into()))); + } + } } } } } + + fn progress_invariant(&self) -> bool { + let iter = self.todo.iter().map(|x| match x { + BsiTodo::Directory { progress_min, .. } => progress_min, + BsiTodo::File { progress, .. } => progress, + }); + let iter_1 = iter.clone().skip(1); + iter.zip(iter_1).all(|(prev, next)| prev >= next) + } } -- cgit v1.2.3 From 6c420c0880de742b2b6416da1178df828fd977bf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 13:43:38 +0200 Subject: block manager: multi-directory layout computation --- src/block/Cargo.toml | 1 + src/block/layout.rs | 264 +++++++++++++++++++++++++++++++++++++++++++++++---- src/block/manager.rs | 10 +- src/block/repair.rs | 10 +- 4 files changed, 255 insertions(+), 30 deletions(-) (limited to 'src/block') diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1057b699..b77988d6 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -24,6 +24,7 @@ opentelemetry = "0.17" arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" +bytesize = "1.2" hex = "0.4" tracing = "0.1" rand = "0.8" diff --git a/src/block/layout.rs b/src/block/layout.rs index cbc326d8..4a49b287 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -4,14 +4,23 @@ use serde::{Deserialize, Serialize}; use garage_util::config::DataDirEnum; use garage_util::data::Hash; +use garage_util::error::{Error, OkOrMessage}; use garage_util::migrate::*; -pub const DRIVE_NPART: usize = 1024; +type Idx = u16; + +const DRIVE_NPART: usize = 1024; + +const DPART_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec, - pub(crate) partitions: Vec, + + /// Primary storage location (index in data_dirs) for each partition + pub(crate) part_prim: Vec, + /// Secondary storage locations for each partition + pub(crate) part_sec: Vec>, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -20,38 +29,255 @@ pub(crate) struct DataDir { pub(crate) state: DataDirState, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub(crate) struct Partition { - pub(crate) prim: usize, - pub(crate) sec: Vec, -} - impl DataLayout { - pub(crate) fn initialize(dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn initialize(dirs: &DataDirEnum) -> Result { + let data_dirs = make_data_dirs(dirs)?; + + // Split partitions proportionnally to capacity for all drives + // to affect primary storage location + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + + let mut part_prim = Vec::with_capacity(DRIVE_NPART); + let mut cum_cap = 0; + for (i, dd) in data_dirs.iter().enumerate() { + if let DataDirState::Active { capacity } = dd.state { + cum_cap += capacity; + let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap; + part_prim.resize(n_total as usize, i as Idx); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(part_prim.len(), DRIVE_NPART); + + // If any of the storage locations is non-empty, add it as a secondary + // storage location for all partitions + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx { + sec.push(i as Idx); + } + } + } + } + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) + } + + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result { + // Compute list of new data directories and mapping of old indices + // to new indices + let data_dirs = make_data_dirs(dirs)?; + let old2new = self + .data_dirs + .iter() + .map(|x| { + data_dirs + .iter() + .position(|y| y.path == x.path) + .map(|x| x as Idx) + }) + .collect::>(); + + // Compute secondary location list for partitions based on existing + // folders, translating indices from old to new + let mut part_sec = self + .part_sec + .iter() + .map(|dl| { + dl.iter() + .filter_map(|old| old2new.get(*old as usize).copied().flatten()) + .collect::>() + }) + .collect::>(); + + // Compute a vector that, for each data dir, + // contains the list of partitions primarily stored on that drive + let mut dir_prim = vec![vec![]; data_dirs.len()]; + for (ipart, prim) in self.part_prim.iter().enumerate() { + if let Some(new) = old2new.get(*prim as usize).copied().flatten() { + dir_prim[new as usize].push(ipart); + } + } + + // Compute the target number of partitions per data directory + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + let mut cum_cap = 0; + let mut npart_per_dir = vec![]; + for dd in data_dirs.iter() { + if let DataDirState::Active { capacity } = dd.state { + let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; + cum_cap += capacity; + let end = (cum_cap * DRIVE_NPART as u64) / total_cap; + npart_per_dir.push((end - begin) as usize); + } else { + npart_per_dir.push(0); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(npart_per_dir.iter().sum::(), DRIVE_NPART); + + // For all directories that have too many primary partitions, + // move that partition to secondary + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + while parts.len() > *tgt_npart { + let part = parts.pop().unwrap(); + if !part_sec[part].contains(&(idir as Idx)) { + part_sec[part].push(idir as Idx); + } + } + } + + // Calculate the vector of primary partition dir index + let mut part_prim = vec![None; DRIVE_NPART]; + for (idir, parts) in dir_prim.iter().enumerate() { + for part in parts.iter() { + assert!(part_prim[*part].is_none()); + part_prim[*part] = Some(idir as Idx) + } + } + + // Calculate a vector of unassigned partitions + let mut unassigned = part_prim + .iter() + .enumerate() + .filter(|(_, dir)| dir.is_none()) + .map(|(ipart, _)| ipart) + .collect::>(); + + // For all directories that don't have enough primary partitions, + // add partitions from unassigned + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + assert!(unassigned.len() >= *tgt_npart - parts.len()); + for _ in parts.len()..*tgt_npart { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } + } + + // Sanity checks + assert!(part_prim.iter().all(|x| x.is_some())); + assert!(unassigned.is_empty()); + + let part_prim = part_prim + .into_iter() + .map(|x| x.unwrap()) + .collect::>(); + assert!(part_prim.iter().all(|p| data_dirs + .get(*p as usize) + .and_then(|x| x.capacity()) + .unwrap_or(0) + > 0)); + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) } - pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self { - todo!() + pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.data_dir_from(hash, &self.data_dirs[idir].path) } - pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf { - todo!() - /* - let mut path = self.data_dir.clone(); + pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + } + + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1] + ]) as usize % DRIVE_NPART + } + + fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - */ - } + } } impl InitialFormat for DataLayout { const VERSION_MARKER: &'static [u8] = b"G09bmdl"; } + +impl DataDir { + pub fn capacity(&self) -> Option { + match self.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + } + } +} + +fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { + let mut data_dirs = vec![]; + match dirs { + DataDirEnum::Single(path) => data_dirs.push(DataDir { + path: path.clone(), + state: DataDirState::Active { + capacity: 1_000_000_000, // whatever, doesn't matter + }, + }), + DataDirEnum::Multiple(dirs) => { + for dir in dirs.iter() { + let state = match &dir.capacity { + Some(cap) if dir.read_only == false => { + DataDirState::Active { + capacity: cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(), + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; + data_dirs.push(DataDir { + path: dir.path.clone(), + state, + }); + } + } + } + Ok(data_dirs) +} + +fn dir_not_empty(path: &PathBuf) -> Result { + for entry in std::fs::read_dir(&path)? { + let dir = entry?; + if dir.file_type()?.is_dir() + && dir + .file_name() + .into_string() + .ok() + .and_then(|hex| hex::decode(&hex).ok()) + .map(|bytes| (2..=4).contains(&bytes.len())) + .unwrap_or(false) + { + return Ok(true); + } + } + Ok(false) +} diff --git a/src/block/manager.rs b/src/block/manager.rs index 18a2686e..45729a00 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -125,15 +125,19 @@ impl BlockManager { replication: TableShardedReplication, system: Arc, ) -> Arc { + // TODO don't panic, report error let layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir); + layout.update(&data_dir).expect("invalid data_dir config"); layout } - Err(_) => DataLayout::initialize(&data_dir), + Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), }; + layout_persister + .save(&data_layout) + .expect("cannot save data_layout"); let rc = db .open_tree("block_local_rc") @@ -602,7 +606,7 @@ impl BlockManager { /// Utility: gives the path of the directory in which a block should be found fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.data_dir(hash) + self.data_layout.primary_data_dir(hash) } /// Utility: give the full path where a block should be found, minus extension if block is diff --git a/src/block/repair.rs b/src/block/repair.rs index d5e2e168..0e7fe0df 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -473,10 +473,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .filter_map(|x| match x.state { - DataDirState::Active { capacity } => Some(capacity), - _ => None, - }) + .filter_map(|x| x.capacity()) .min() .unwrap_or(0); @@ -484,10 +481,7 @@ impl BlockStoreIterator { .data_layout .data_dirs .iter() - .map(|x| match x.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, // approximation - }) + .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) .sum::() as u128; let mut cum_cap = 0; -- cgit v1.2.3 From 887b3233f45ade24def08b3faa2d6da5fe85a3a1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 14:27:39 +0200 Subject: block manager: use data paths from layout --- src/block/block.rs | 9 ++++ src/block/layout.rs | 37 +++++++------ src/block/manager.rs | 147 ++++++++++++++++++++++++++------------------------- 3 files changed, 104 insertions(+), 89 deletions(-) (limited to 'src/block') diff --git a/src/block/block.rs b/src/block/block.rs index 935aa900..6d79fb6c 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -19,6 +21,13 @@ pub enum DataBlock { Compressed(Bytes), } +pub enum DataBlockPath { + /// Uncompressed data fail + Plain(PathBuf), + /// Compressed data fail + Compressed(PathBuf), +} + impl DataBlock { /// Query whether this block is compressed pub fn is_compressed(&self) -> bool { diff --git a/src/block/layout.rs b/src/block/layout.rs index 4a49b287..b119281b 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -190,32 +190,35 @@ impl DataLayout { }) } - pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { - let ipart = self.partition_from(hash); - let idir = self.part_prim[ipart] as usize; - self.data_dir_from(hash, &self.data_dirs[idir].path) + pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.block_dir_from(hash, &self.data_dirs[idir].path) } - pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { - let ipart = self.partition_from(hash); - self.part_sec[ipart] - .iter() - .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + pub(crate) fn secondary_block_dirs<'a>( + &'a self, + hash: &'a Hash, + ) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path)) } - fn partition_from(&self, hash: &Hash) -> usize { - u16::from_be_bytes([ - hash.as_slice()[DPART_BYTES.0], - hash.as_slice()[DPART_BYTES.1] - ]) as usize % DRIVE_NPART - } + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1], + ]) as usize % DRIVE_NPART + } - fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 45729a00..73fefa0c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,21 +543,25 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); - let compressed = match self.is_block_compressed(hash).await { - Ok(c) => c, - Err(e) => { + let block_path = match self.find_block(hash).await { + Some(p) => p, + None => { // Not found but maybe we should have had it ?? self.resync .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Into::into(e)); + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); } }; - if compressed { - path.set_extension("zst"); - } - let mut f = fs::File::open(&path).await?; + let (path, compressed) = match &block_path { + DataBlockPath::Plain(p) => (p, false), + DataBlockPath::Compressed(p) => (p, true), + }; + + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); @@ -571,11 +575,16 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); self.lock_mutate(hash) .await - .move_block_to_corrupted(hash, self) + .move_block_to_corrupted(&block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; + return Err(Error::CorruptData(*hash)); } @@ -604,56 +613,51 @@ impl BlockManager { .await } - /// Utility: gives the path of the directory in which a block should be found - fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.primary_data_dir(hash) - } - - /// Utility: give the full path where a block should be found, minus extension if block is - /// compressed - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } + /// Utility: check if block is stored compressed. + async fn find_block(&self, hash: &Hash) -> Option { + let dirs = Some(self.data_layout.primary_block_dir(hash)) + .into_iter() + .chain(self.data_layout.secondary_block_dirs(hash)); + let filename = hex::encode(hash.as_ref()); - /// Utility: check if block is stored compressed. Error if block is not stored - async fn is_block_compressed(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); + for dir in dirs { + let mut path = dir; + path.push(&filename); - // If compression is disabled on node - check for the raw block - // first and then a compressed one (as compression may have been - // previously enabled). - match self.compression_level { - None => { + if self.compression_level.is_none() { + // If compression is disabled on node - check for the raw block + // first and then a compressed one (as compression may have been + // previously enabled). if fs::metadata(&path).await.is_ok() { - return Ok(false); + return Some(DataBlockPath::Plain(path)); } - path.set_extension("zst"); - - fs::metadata(&path).await.map(|_| true).map_err(Into::into) - } - _ => { + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Compressed(path)); + } + } else { path.set_extension("zst"); - if fs::metadata(&path).await.is_ok() { - return Ok(true); + return Some(DataBlockPath::Compressed(path)); } - path.set_extension(""); - - fs::metadata(&path).await.map(|_| false).map_err(Into::into) + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Plain(path)); + } } } + + None } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); - self.mutation_lock[hash.as_slice()[0] as usize] + let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize + % self.mutation_lock.len(); + self.mutation_lock[ilock] .lock() .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), + tracer.start(format!("Acquire mutation_lock #{}", ilock)), )) .await } @@ -688,7 +692,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result { - let exists = mgr.is_block_compressed(hash).await.is_ok(); + let exists = mgr.find_block(hash).await.is_some(); let needed = mgr.rc.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -703,21 +707,17 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.block_dir(hash); + let mut path = mgr.data_layout.primary_block_dir(hash); let directory = path.clone(); path.push(hex::encode(hash)); fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(()), - (Ok(false), false) => return Ok(()), - (Ok(false), true) => { - let path_to_delete = path.clone(); - path.set_extension("zst"); - Some(path_to_delete) - } - (Err(_), compressed) => { + let to_delete = match (mgr.find_block(hash).await, compressed) { + (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (None, compressed) => { if compressed { path.set_extension("zst"); } @@ -766,19 +766,20 @@ impl BlockManagerLocked { Ok(()) } - async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - warn!( - "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", - hash - ); - let mut path = mgr.block_path(hash); - let mut path2 = path.clone(); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); - path2.set_extension("zst.corrupted"); - } else { - path2.set_extension("corrupted"); - } + async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { + let (path, path2) = match block_path { + DataBlockPath::Plain(p) => { + let mut p2 = p.clone(); + p2.set_extension("corrupted"); + (p, p2) + } + DataBlockPath::Compressed(p) => { + let mut p2 = p.clone(); + p2.set_extension("zst.corrupted"); + (p, p2) + } + }; + fs::rename(path, path2).await?; Ok(()) } @@ -787,12 +788,14 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let mut path = mgr.block_path(hash); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); + let path_opt = match mgr.find_block(hash).await { + Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), + None => None, + }; + if let Some(path) = path_opt { + fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } - fs::remove_file(path).await?; - mgr.metrics.delete_counter.add(1); } Ok(()) } -- cgit v1.2.3 From a09f86729c1c28c6881b802b49d5574386ef1d0d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 14:37:10 +0200 Subject: block manager: move blocks in write_block if necessary --- src/block/manager.rs | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 73fefa0c..798cedf9 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -707,25 +707,39 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.data_layout.primary_block_dir(hash); - let directory = path.clone(); - path.push(hex::encode(hash)); + let mut tgt_path = mgr.data_layout.primary_block_dir(hash); + let directory = tgt_path.clone(); + tgt_path.push(hex::encode(hash)); + if compressed { + tgt_path.set_extension("zst"); + } fs::create_dir_all(&directory).await?; let to_delete = match (mgr.find_block(hash).await, compressed) { + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path + (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), + (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one + (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Plain(_)), false) => return Ok(()), - (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), - (None, compressed) => { - if compressed { - path.set_extension("zst"); - } - None - } + + // If the block isn't stored already, just store what is given to us + (None, _) => None, }; - let mut path_tmp = path.clone(); + let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); @@ -740,7 +754,7 @@ impl BlockManagerLocked { drop(f); - fs::rename(path_tmp, path).await?; + fs::rename(path_tmp, tgt_path).await?; delete_on_drop.cancel(); -- cgit v1.2.3 From 1b8c265c14b2f788693aed6c15f65684c72d2d1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:04:59 +0200 Subject: block manager: get rid of check_block_status --- src/block/manager.rs | 84 ++++++++++++++++++++++------------------------------ src/block/resync.rs | 17 ++++++----- 2 files changed, 45 insertions(+), 56 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 798cedf9..5bad34d4 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,8 +543,8 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let block_path = match self.find_block(hash).await { - Some(p) => p, + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, None => { // Not found but maybe we should have had it ?? self.resync @@ -554,9 +554,15 @@ impl BlockManager { hash ))); } - }; + } + } - let (path, compressed) = match &block_path { + pub(crate) async fn read_block_from( + &self, + hash: &Hash, + block_path: &DataBlockPath, + ) -> Result { + let (path, compressed) = match block_path { DataBlockPath::Plain(p) => (p, false), DataBlockPath::Compressed(p) => (p, true), }; @@ -581,7 +587,7 @@ impl BlockManager { ); self.lock_mutate(hash) .await - .move_block_to_corrupted(&block_path) + .move_block_to_corrupted(block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; @@ -591,18 +597,11 @@ impl BlockManager { Ok(data) } - /// Check if this node has a block and whether it needs it - pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result { - self.lock_mutate(hash) - .await - .check_block_status(hash, self) - .await - } - /// Check if this node should have a block, but don't actually have it async fn need_block(&self, hash: &Hash) -> Result { - let BlockStatus { exists, needed } = self.check_block_status(hash).await?; - Ok(needed.is_nonzero() && !exists) + let rc = self.rc.get_block_rc(hash)?; + let exists = self.find_block(hash).await.is_some(); + Ok(rc.is_nonzero() && !exists) } /// Delete block if it is not needed anymore @@ -613,8 +612,8 @@ impl BlockManager { .await } - /// Utility: check if block is stored compressed. - async fn find_block(&self, hash: &Hash) -> Option { + /// Find the path where a block is currently stored + pub(crate) async fn find_block(&self, hash: &Hash) -> Option { let dirs = Some(self.data_layout.primary_block_dir(hash)) .into_iter() .chain(self.data_layout.secondary_block_dirs(hash)); @@ -687,17 +686,6 @@ pub(crate) struct BlockStatus { } impl BlockManagerLocked { - async fn check_block_status( - &self, - hash: &Hash, - mgr: &BlockManager, - ) -> Result { - let exists = mgr.find_block(hash).await.is_some(); - let needed = mgr.rc.get_block_rc(hash)?; - - Ok(BlockStatus { exists, needed }) - } - async fn write_block( &self, hash: &Hash, @@ -710,32 +698,32 @@ impl BlockManagerLocked { let mut tgt_path = mgr.data_layout.primary_block_dir(hash); let directory = tgt_path.clone(); tgt_path.push(hex::encode(hash)); - if compressed { - tgt_path.set_extension("zst"); - } + if compressed { + tgt_path.set_extension("zst"); + } fs::create_dir_all(&directory).await?; let to_delete = match (mgr.find_block(hash).await, compressed) { - // If the block is stored in the wrong directory, - // write it again at the correct path and delete the old path + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), - // If the block is already stored not compressed but we have a compressed - // copy, write the compressed copy and delete the uncompressed one + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), - // If the block is already stored compressed, - // keep the stored copy, we have nothing to do + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), - // If the block is already stored not compressed, - // and we don't have a compressed copy either, - // keep the stored copy, we have nothing to do + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Plain(_)), false) => return Ok(()), - // If the block isn't stored already, just store what is given to us + // If the block isn't stored already, just store what is given to us (None, _) => None, }; @@ -799,14 +787,12 @@ impl BlockManagerLocked { } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - - if exists && needed.is_deletable() { - let path_opt = match mgr.find_block(hash).await { - Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), - None => None, - }; - if let Some(path) = path_opt { + let rc = mgr.rc.get_block_rc(hash)?; + if rc.is_deletable() { + while let Some(path) = mgr.find_block(hash).await { + let path = match path { + DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, + }; fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } diff --git a/src/block/resync.rs b/src/block/resync.rs index ea280ad4..bb43ad7e 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -359,20 +359,23 @@ impl BlockResyncManager { } async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> { - let BlockStatus { exists, needed } = manager.check_block_status(hash).await?; + let existing_path = manager.find_block(hash).await; + let exists = existing_path.is_some(); + let rc = manager.rc.get_block_rc(hash)?; - if exists != needed.is_needed() || exists != needed.is_nonzero() { + if exists != rc.is_needed() || exists != rc.is_nonzero() { debug!( "Resync block {:?}: exists {}, nonzero rc {}, deletable {}", hash, exists, - needed.is_nonzero(), - needed.is_deletable(), + rc.is_nonzero(), + rc.is_deletable(), ); } - if exists && needed.is_deletable() { + if exists && rc.is_deletable() { info!("Resync block {:?}: offloading and deleting", hash); + let existing_path = existing_path.unwrap(); let mut who = manager.replication.write_nodes(hash); if who.len() < manager.replication.write_quorum() { @@ -419,7 +422,7 @@ impl BlockResyncManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let block = manager.read_block(hash).await?; + let block = manager.read_block_from(hash, &existing_path).await?; let (header, bytes) = block.into_parts(); let put_block_message = Req::new(BlockRpc::PutBlock { hash: *hash, @@ -451,7 +454,7 @@ impl BlockResyncManager { manager.rc.clear_deleted_block_rc(hash)?; } - if needed.is_nonzero() && !exists { + if rc.is_nonzero() && !exists { info!( "Resync block {:?}: fetching absent but needed block (refcount > 0)", hash -- cgit v1.2.3 From 93114a9747cefa441ebd274f206c09699d051b39 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:39:21 +0200 Subject: block manager: refactoring --- src/block/manager.rs | 96 ++++++++++++++++++++++++---------------------------- 1 file changed, 44 insertions(+), 52 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 5bad34d4..d18d3f4c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { data_fsync: bool, compression_level: Option, - mutation_lock: [Mutex; 256], + mutation_lock: Vec>, pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -111,6 +111,9 @@ pub struct BlockResyncErrorInfo { pub next_try: u64, } +// The number of different mutexes used to parallelize write access to data blocks +const MUTEX_COUNT: usize = 256; + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -124,21 +127,24 @@ impl BlockManager { compression_level: Option, replication: TableShardedReplication, system: Arc, - ) -> Arc { - // TODO don't panic, report error + ) -> Result, Error> { + // Load or compute layout, i.e. assignment of data blocks to the different data directories let layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir).expect("invalid data_dir config"); + layout + .update(&data_dir) + .ok_or_message("invalid data_dir config")?; layout } - Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), + Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; layout_persister .save(&data_layout) .expect("cannot save data_layout"); + // Open metadata tables let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -165,7 +171,10 @@ impl BlockManager { data_layout, data_fsync, compression_level, - mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), + mutation_lock: vec![(); MUTEX_COUNT] + .iter() + .map(|_| Mutex::new(BlockManagerLocked())) + .collect::>(), rc, resync, system, @@ -177,7 +186,7 @@ impl BlockManager { block_manager.endpoint.set_handler(block_manager.clone()); block_manager.scrub_persister.set_with(|_| ()).unwrap(); - block_manager + Ok(block_manager) } pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { @@ -224,44 +233,10 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); - - for node in who.iter() { - let node_id = NodeID::from(*node); - let rpc = self.endpoint.call_streaming( - &node_id, - BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, - ); - tokio::select! { - res = rpc => { - let res = match res { - Ok(res) => res, - Err(e) => { - debug!("Node {:?} returned error: {}", node, e); - continue; - } - }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { - debug!("Node {:?} returned a malformed response", node); - continue; - } - }; - return Ok((header, stream)); - } - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); - } - }; - } - - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + Ok((header, stream)) + }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -271,6 +246,24 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + read_stream_to_end(stream) + .await + .map(|data| DataBlock::from_parts(header, data)) + }) + .await + } + + async fn rpc_get_raw_block_internal( + &self, + hash: &Hash, + order_tag: Option, + f: F, + ) -> Result + where + F: Fn(DataBlockHeader, ByteStream) -> Fut, + Fut: futures::Future>, + { let who = self.replication.read_nodes(hash); let who = self.system.rpc.request_order(&who); @@ -297,13 +290,17 @@ impl BlockManager { continue; } }; - match read_stream_to_end(stream).await { - Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + match f(header, stream).await { + Ok(ret) => return Ok(ret), Err(e) => { debug!("Error reading stream from node {:?}: {}", node, e); } } } + // TODO: sleep less long (fail early), initiate a second request earlier + // if the first one doesn't succeed rapidly + // TODO: keep first request running when initiating a new one and take the + // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { debug!("Node {:?} didn't return block in time, trying next.", node); } @@ -680,11 +677,6 @@ impl StreamingEndpointHandler for BlockManager { } } -pub(crate) struct BlockStatus { - pub(crate) exists: bool, - pub(crate) needed: RcEntry, -} - impl BlockManagerLocked { async fn write_block( &self, -- cgit v1.2.3 From 3a74844df02b5ecec0b96bfb8b2ff3bcdd33f7f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:41:36 +0200 Subject: block manager: fix dir_not_empty --- src/block/layout.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index b119281b..3b529fc0 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -276,8 +276,7 @@ fn dir_not_empty(path: &PathBuf) -> Result { .into_string() .ok() .and_then(|hex| hex::decode(&hex).ok()) - .map(|bytes| (2..=4).contains(&bytes.len())) - .unwrap_or(false) + .is_some() { return Ok(true); } -- cgit v1.2.3 From a44f4869312678e3c6eaac1a26a7beb4652f3e69 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 15:57:25 +0200 Subject: block manager: refactoring & increase max worker count to 8 --- src/block/manager.rs | 26 +++++++++++++------------- src/block/resync.rs | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index d18d3f4c..b42a9aa9 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -279,21 +279,21 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Node {:?} returned error: {}", node, e); + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), _ => { - debug!("Node {:?} returned a malformed response", node); + debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; } }; match f(header, stream).await { Ok(ret) => return Ok(ret), Err(e) => { - debug!("Error reading stream from node {:?}: {}", node, e); + debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); } } } @@ -302,15 +302,14 @@ impl BlockManager { // TODO: keep first request running when initiating a new one and take the // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); + debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + let msg = format!("Get block {:?}: no node returned a valid block", hash); + debug!("{}", msg); + Err(Error::Message(msg)) } // ---- Public interface ---- @@ -666,7 +665,7 @@ impl StreamingEndpointHandler for BlockManager { BlockRpc::PutBlock { hash, header } => Resp::new( self.handle_put_block(*hash, *header, message.take_stream()) .await - .map(|_| BlockRpc::Ok), + .map(|()| BlockRpc::Ok), ), BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { @@ -687,15 +686,14 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut tgt_path = mgr.data_layout.primary_block_dir(hash); - let directory = tgt_path.clone(); + let directory = mgr.data_layout.primary_block_dir(hash); + + let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); if compressed { tgt_path.set_extension("zst"); } - fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.find_block(hash).await, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path @@ -723,6 +721,8 @@ impl BlockManagerLocked { let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); + fs::create_dir_all(&directory).await?; + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); let mut f = fs::File::create(&path_tmp).await?; diff --git a/src/block/resync.rs b/src/block/resync.rs index bb43ad7e..9c1da4a7 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; // No more than 4 resync workers can be running in the system -pub(crate) const MAX_RESYNC_WORKERS: usize = 4; +pub(crate) const MAX_RESYNC_WORKERS: usize = 8; // Resync tranquility is initially set to 2, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_RESYNC_TRANQUILITY: u32 = 2; -- cgit v1.2.3 From 55c514999eef17d7764040cde1b7b38ca111d24c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 17:00:52 +0200 Subject: block manager: fixes in layout --- src/block/layout.rs | 56 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 23 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index 3b529fc0..19b6fa17 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -11,25 +11,28 @@ type Idx = u16; const DRIVE_NPART: usize = 1024; -const DPART_BYTES: (usize, usize) = (2, 3); +const HASH_DRIVE_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec, /// Primary storage location (index in data_dirs) for each partition + /// = the location where the data is supposed to be, blocks are always + /// written there (copies in other dirs may be deleted if they exist) pub(crate) part_prim: Vec, - /// Secondary storage locations for each partition + /// Secondary storage locations for each partition = locations + /// where data blocks might be, we check from these dirs when reading pub(crate) part_sec: Vec>, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub(crate) struct DataDir { pub(crate) path: PathBuf, pub(crate) state: DataDirState, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, @@ -55,8 +58,9 @@ impl DataLayout { assert_eq!(cum_cap, total_cap); assert_eq!(part_prim.len(), DRIVE_NPART); - // If any of the storage locations is non-empty, add it as a secondary - // storage location for all partitions + // If any of the storage locations is non-empty, it probably existed before + // this algorithm was added, so add it as a secondary storage location for all partitions + // to make sure existing files are not lost let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if dir_not_empty(&dd.path)? { @@ -75,10 +79,14 @@ impl DataLayout { }) } - pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result { - // Compute list of new data directories and mapping of old indices - // to new indices + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> { + // Make list of new data directories, exit if nothing changed let data_dirs = make_data_dirs(dirs)?; + if data_dirs == self.data_dirs { + return Ok(()); + } + + // Compute mapping of old indices to new indices let old2new = self .data_dirs .iter() @@ -114,15 +122,13 @@ impl DataLayout { // Compute the target number of partitions per data directory let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); let mut cum_cap = 0; - let mut npart_per_dir = vec![]; - for dd in data_dirs.iter() { + let mut npart_per_dir = vec![0; data_dirs.len()]; + for (idir, dd) in data_dirs.iter().enumerate() { if let DataDirState::Active { capacity } = dd.state { let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; cum_cap += capacity; let end = (cum_cap * DRIVE_NPART as u64) / total_cap; - npart_per_dir.push((end - begin) as usize); - } else { - npart_per_dir.push(0); + npart_per_dir[idir] = (end - begin) as usize; } } assert_eq!(cum_cap, total_cap); @@ -161,11 +167,14 @@ impl DataLayout { // add partitions from unassigned for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() { - assert!(unassigned.len() >= *tgt_npart - parts.len()); - for _ in parts.len()..*tgt_npart { - let new_part = unassigned.pop().unwrap(); - part_prim[new_part] = Some(idir as Idx); - part_sec[new_part].retain(|x| *x != idir as Idx); + if parts.len() < *tgt_npart { + let required = *tgt_npart - parts.len(); + assert!(unassigned.len() >= required); + for _ in 0..required { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } } } @@ -183,11 +192,12 @@ impl DataLayout { .unwrap_or(0) > 0)); - Ok(Self { + *self = Self { data_dirs, part_prim, part_sec, - }) + }; + Ok(()) } pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { @@ -208,8 +218,8 @@ impl DataLayout { fn partition_from(&self, hash: &Hash) -> usize { u16::from_be_bytes([ - hash.as_slice()[DPART_BYTES.0], - hash.as_slice()[DPART_BYTES.1], + hash.as_slice()[HASH_DRIVE_BYTES.0], + hash.as_slice()[HASH_DRIVE_BYTES.1], ]) as usize % DRIVE_NPART } -- cgit v1.2.3 From e30865984a5f23f046396ca192c1930314b50115 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 5 Sep 2023 17:37:45 +0200 Subject: block manager: scrub checkpointing --- src/block/repair.rs | 158 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 108 insertions(+), 50 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index 0e7fe0df..a7c90d4f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -176,7 +176,9 @@ mod v081 { } mod v082 { + use garage_util::data::Hash; use serde::{Deserialize, Serialize}; + use std::path::PathBuf; use super::v081; @@ -186,6 +188,27 @@ mod v082 { pub(crate) time_last_complete_scrub: u64, pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, + #[serde(default)] + pub(crate) checkpoint: Option, + } + + #[derive(Serialize, Deserialize, Clone)] + pub struct BlockStoreIterator { + pub todo: Vec, + } + + #[derive(Serialize, Deserialize, Clone)] + pub enum BsiTodo { + Directory { + path: PathBuf, + progress_min: u64, + progress_max: u64, + }, + File { + path: PathBuf, + hash: Hash, + progress: u64, + }, } impl garage_util::migrate::Migrate for ScrubWorkerPersisted { @@ -200,6 +223,7 @@ mod v082 { time_last_complete_scrub: old.time_last_complete_scrub, time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, + checkpoint: None, } } } @@ -236,14 +260,23 @@ impl Default for ScrubWorkerPersisted { time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, + checkpoint: None, } } } #[derive(Default)] enum ScrubWorkerState { - Running(BlockStoreIterator), - Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Running { + iterator: BlockStoreIterator, + // time of the last checkpoint + t_cp: u64, + }, + Paused { + iterator: BlockStoreIterator, + // time at which the scrub should be resumed + t_resume: u64, + }, #[default] Finished, } @@ -262,10 +295,17 @@ impl ScrubWorker { rx_cmd: mpsc::Receiver, persister: PersisterShared, ) -> Self { + let work = match persister.get_with(|x| x.checkpoint.clone()) { + None => ScrubWorkerState::Finished, + Some(iterator) => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, + }; Self { manager, rx_cmd, - work: ScrubWorkerState::Finished, + work, tranquilizer: Tranquilizer::new(30), persister, } @@ -278,7 +318,16 @@ impl ScrubWorker { ScrubWorkerState::Finished => { info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); - ScrubWorkerState::Running(iterator) + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + } } work => { error!("Cannot start scrub worker: already running!"); @@ -288,8 +337,18 @@ impl ScrubWorker { } ScrubWorkerCommand::Pause(dur) => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { - ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + ScrubWorkerState::Running { iterator, .. } + | ScrubWorkerState::Paused { iterator, .. } => { + if let Err(e) = self + .persister + .set_with(|x| x.checkpoint = Some(iterator.clone())) + { + error!("Could not save scrub checkpoint: {}", e); + } + ScrubWorkerState::Paused { + iterator, + t_resume: now_msec() + dur.as_millis() as u64, + } } work => { error!("Cannot pause scrub worker: not running!"); @@ -299,7 +358,10 @@ impl ScrubWorker { } ScrubWorkerCommand::Resume => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running { + iterator, + t_cp: now_msec(), + }, work => { error!("Cannot resume scrub worker: not paused!"); work @@ -308,7 +370,7 @@ impl ScrubWorker { } ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { - ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { ScrubWorkerState::Finished } work => { @@ -344,12 +406,15 @@ impl Worker for ScrubWorker { ..Default::default() }; match &self.work { - ScrubWorkerState::Running(bsi) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); + ScrubWorkerState::Running { iterator, .. } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); } - ScrubWorkerState::Paused(bsi, rt) => { - s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); - s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; + ScrubWorkerState::Paused { iterator, t_resume } => { + s.progress = Some(format!("{:.2}%", iterator.progress() * 100.)); + s.freeform = vec![format!( + "Scrub paused, resumes at {}", + msec_to_rfc3339(*t_resume) + )]; } ScrubWorkerState::Finished => { s.freeform = vec![ @@ -375,9 +440,11 @@ impl Worker for ScrubWorker { }; match &mut self.work { - ScrubWorkerState::Running(bsi) => { + ScrubWorkerState::Running { iterator, t_cp } => { self.tranquilizer.reset(); - if let Some((_path, hash)) = bsi.next().await? { + let now = now_msec(); + + if let Some((_path, hash)) = iterator.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); @@ -386,16 +453,23 @@ impl Worker for ScrubWorker { Err(e) => return Err(e), _ => (), }; + + if now - *t_cp > 60 * 1000 { + self.persister + .set_with(|p| p.checkpoint = Some(iterator.clone()))?; + *t_cp = now; + } + Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - let now = now_msec(); let next_scrub_timestamp = randomize_next_scrub_run_time(now); self.persister.set_with(|p| { p.time_last_complete_scrub = now; p.time_next_run_scrub = next_scrub_timestamp; + p.checkpoint = None; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); @@ -414,8 +488,8 @@ impl Worker for ScrubWorker { async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { - ScrubWorkerState::Running(_) => return WorkerState::Busy, - ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), + ScrubWorkerState::Running { .. } => return WorkerState::Busy, + ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, @@ -438,7 +512,7 @@ impl Worker for ScrubWorker { } match &self.work { - ScrubWorkerState::Running(_) => WorkerState::Busy, + ScrubWorkerState::Running { .. } => WorkerState::Busy, _ => WorkerState::Idle, } } @@ -450,23 +524,6 @@ impl Worker for ScrubWorker { const PROGRESS_FP: u64 = 1_000_000_000; -struct BlockStoreIterator { - todo: Vec, -} - -enum BsiTodo { - Directory { - path: PathBuf, - progress_min: u64, - progress_max: u64, - }, - File { - path: PathBuf, - filename: String, - progress: u64, - }, -} - impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let min_cap = manager @@ -551,11 +608,18 @@ impl BlockStoreIterator { progress_max: 0, }); } else if ft.is_file() { - self.todo.push(BsiTodo::File { - path: ent.path(), - filename: name, - progress: 0, - }); + let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name); + if filename.len() == 64 { + if let Ok(h) = hex::decode(filename) { + let mut hash = [0u8; 32]; + hash.copy_from_slice(&h); + self.todo.push(BsiTodo::File { + path: ent.path(), + hash: hash.into(), + progress: 0, + }); + } + } } } @@ -582,20 +646,14 @@ impl BlockStoreIterator { self.todo[istart..].reverse(); debug_assert!(self.progress_invariant()); } - Some(BsiTodo::File { path, filename, .. }) => { - let filename = filename.strip_suffix(".zst").unwrap_or(&filename); - if filename.len() == 64 { - if let Ok(h) = hex::decode(filename) { - let mut hash = [0u8; 32]; - hash.copy_from_slice(&h); - return Ok(Some((path, hash.into()))); - } - } + Some(BsiTodo::File { path, hash, .. }) => { + return Ok(Some((path, hash))); } } } } + // for debug_assert! fn progress_invariant(&self) -> bool { let iter = self.todo.iter().map(|x| match x { BsiTodo::Directory { progress_min, .. } => progress_min, -- cgit v1.2.3 From f38a31b3304726aa7c890ba1a9f7a3e67b11bc60 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 6 Sep 2023 17:49:30 +0200 Subject: block manager: avoid incorrect data_dir configs and avoid losing files --- src/block/layout.rs | 41 ++++++++++++++++++++++++++++++++++++++--- src/block/manager.rs | 8 ++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index 19b6fa17..8098654f 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -45,6 +45,7 @@ impl DataLayout { // Split partitions proportionnally to capacity for all drives // to affect primary storage location let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + assert!(total_cap > 0); let mut part_prim = Vec::with_capacity(DRIVE_NPART); let mut cum_cap = 0; @@ -86,6 +87,9 @@ impl DataLayout { return Ok(()); } + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); + assert!(total_cap > 0); + // Compute mapping of old indices to new indices let old2new = self .data_dirs @@ -120,7 +124,6 @@ impl DataLayout { } // Compute the target number of partitions per data directory - let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::(); let mut cum_cap = 0; let mut npart_per_dir = vec![0; data_dirs.len()]; for (idir, dd) in data_dirs.iter().enumerate() { @@ -182,6 +185,7 @@ impl DataLayout { assert!(part_prim.iter().all(|x| x.is_some())); assert!(unassigned.is_empty()); + // Transform part_prim from vec of Option to vec of Idx let part_prim = part_prim .into_iter() .map(|x| x.unwrap()) @@ -192,6 +196,25 @@ impl DataLayout { .unwrap_or(0) > 0)); + // If any of the newly added storage locations is non-empty, + // it might have been removed and added again and might contain data, + // so add it as a secondary storage location for all partitions + // to make sure existing files are not lost + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if self.data_dirs.iter().any(|ed| ed.path == dd.path) { + continue; + } + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx && !sec.contains(&(i as Idx)) { + sec.push(i as Idx); + } + } + } + } + + // Apply newly generated config *self = Self { data_dirs, part_prim, @@ -254,12 +277,18 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { }, }), DataDirEnum::Multiple(dirs) => { + let mut ok = false; for dir in dirs.iter() { let state = match &dir.capacity { Some(cap) if dir.read_only == false => { + let capacity = cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(); + if capacity == 0 { + return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); + } + ok = true; DataDirState::Active { - capacity: cap.parse::() - .ok_or_message("invalid capacity value")?.as_u64(), + capacity, } } None if dir.read_only == true => { @@ -272,6 +301,12 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { state, }); } + if !ok { + return Err(Error::Message( + "incorrect data_dir configuration, no primary writable directory specified" + .into(), + )); + } } } Ok(data_dirs) diff --git a/src/block/manager.rs b/src/block/manager.rs index b42a9aa9..eb498be0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -279,16 +279,20 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); + debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { + (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; } + (Err(e), _) => { + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); + continue; + } }; match f(header, stream).await { Ok(ret) => return Ok(ret), -- cgit v1.2.3 From 99ed18350f9572ebb1968107d3708d53682ee805 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 12:41:36 +0200 Subject: block manager: refactor and fix monitoring/statistics --- src/block/manager.rs | 47 ++++++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 27 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index eb498be0..0081f46c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -491,8 +491,6 @@ impl BlockManager { pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); - let write_size = data.inner_buffer().len() as u64; - self.lock_mutate(hash) .await .write_block(hash, data, self) @@ -502,8 +500,6 @@ impl BlockManager { )) .await?; - self.metrics.bytes_written.add(write_size); - Ok(()) } @@ -530,31 +526,26 @@ impl BlockManager { /// Read block from disk, verifying it's integrity pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; - - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); - - Ok(data) - } - - async fn read_block_internal(&self, hash: &Hash) -> Result { - match self.find_block(hash).await { - Some(p) => self.read_block_from(hash, &p).await, - None => { - // Not found but maybe we should have had it ?? - self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Error::Message(format!( - "block {:?} not found on node", - hash - ))); + let tracer = opentelemetry::global::tracer("garage"); + async { + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, + None => { + // Not found but maybe we should have had it ?? + self.resync + .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); + } } } + .bound_record_duration(&self.metrics.block_read_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManager::read_block"), + )) + .await } pub(crate) async fn read_block_from( @@ -570,6 +561,7 @@ impl BlockManager { let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; + self.metrics.bytes_read.add(data.len() as u64); drop(f); let data = if compressed { @@ -731,6 +723,7 @@ impl BlockManagerLocked { let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; + mgr.metrics.bytes_written.add(data.len() as u64); if mgr.data_fsync { f.sync_all().await?; -- cgit v1.2.3 From 6b008b5bd3843bb236f94a1b4472de11f5755f04 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 13:44:11 +0200 Subject: block manager: add rebalance operation to rebalance multi-hdd setups --- src/block/layout.rs | 8 +++++ src/block/manager.rs | 22 +++++++------ src/block/repair.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 106 insertions(+), 14 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index 8098654f..e32ef785 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -252,6 +252,14 @@ impl DataLayout { path.push(hex::encode(&hash.as_slice()[1..2])); path } + + pub(crate) fn without_secondary_locations(&self) -> Self { + Self { + data_dirs: self.data_dirs.clone(), + part_prim: self.part_prim.clone(), + part_sec: self.part_sec.iter().map(|_| vec![]).collect::>(), + } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 0081f46c..e0fbfe74 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; @@ -83,7 +83,9 @@ pub struct BlockManager { /// Directory/ies in which block are stored pub data_dir: DataDirEnum, /// Data layout - pub(crate) data_layout: DataLayout, + pub(crate) data_layout: ArcSwap, + /// Data layout persister + pub(crate) data_layout_persister: Persister, data_fsync: bool, compression_level: Option, @@ -129,9 +131,9 @@ impl BlockManager { system: Arc, ) -> Result, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories - let layout_persister: Persister = + let data_layout_persister: Persister = Persister::new(&system.metadata_dir, "data_layout"); - let data_layout = match layout_persister.load() { + let data_layout = match data_layout_persister.load() { Ok(mut layout) => { layout .update(&data_dir) @@ -140,7 +142,7 @@ impl BlockManager { } Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; - layout_persister + data_layout_persister .save(&data_layout) .expect("cannot save data_layout"); @@ -168,7 +170,8 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, - data_layout, + data_layout: ArcSwap::new(Arc::new(data_layout)), + data_layout_persister, data_fsync, compression_level, mutation_lock: vec![(); MUTEX_COUNT] @@ -606,9 +609,10 @@ impl BlockManager { /// Find the path where a block is currently stored pub(crate) async fn find_block(&self, hash: &Hash) -> Option { - let dirs = Some(self.data_layout.primary_block_dir(hash)) + let data_layout = self.data_layout.load_full(); + let dirs = Some(data_layout.primary_block_dir(hash)) .into_iter() - .chain(self.data_layout.secondary_block_dirs(hash)); + .chain(data_layout.secondary_block_dirs(hash)); let filename = hex::encode(hash.as_ref()); for dir in dirs { @@ -682,7 +686,7 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let directory = mgr.data_layout.primary_block_dir(hash); + let directory = mgr.data_layout.load().primary_block_dir(hash); let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); diff --git a/src/block/repair.rs b/src/block/repair.rs index a7c90d4f..1bea9f09 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -518,6 +518,86 @@ impl Worker for ScrubWorker { } } +// ---- ---- ---- +// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS +// between multiple storage locations. +// This is a one-shot repair operation that can be launched, +// checks everything, and then exits. +// ---- ---- ---- + +pub struct RebalanceWorker { + manager: Arc, + block_iter: BlockStoreIterator, + moved: usize, + moved_bytes: usize, +} + +impl RebalanceWorker { + pub fn new(manager: Arc) -> Self { + let block_iter = BlockStoreIterator::new(&manager); + Self { + manager, + block_iter, + moved: 0, + moved_bytes: 0, + } + } +} + +#[async_trait] +impl Worker for RebalanceWorker { + fn name(&self) -> String { + "Block rebalance worker".into() + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), + freeform: vec![ + format!("Blocks moved: {}", self.moved), + format!("Bytes moved: {}", self.moved_bytes), + ], + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + if let Some((path, hash)) = self.block_iter.next().await? { + let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); + if path.parent().expect("no parent?") != prim_loc { + // block is not in its primary location, + // move it there (reading and re-writing does the trick) + let data = self.manager.read_block(&hash).await?; + self.manager.write_block(&hash, &data).await?; + self.moved += 1; + self.moved_bytes += data.inner_buffer().len(); + } + Ok(WorkerState::Busy) + } else { + // all blocks are in their primary location: + // - the ones we moved now are + // - the ones written in the meantime always were, because we only + // write to primary locations + // so we can safely remove all secondary locations from the data layout + let new_layout = self + .manager + .data_layout + .load_full() + .without_secondary_locations(); + self.manager + .data_layout_persister + .save_async(&new_layout) + .await?; + self.manager.data_layout.store(Arc::new(new_layout)); + Ok(WorkerState::Done) + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + // ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- @@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000; impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let min_cap = manager - .data_layout + let data_layout = manager.data_layout.load_full(); + + let min_cap = data_layout .data_dirs .iter() .filter_map(|x| x.capacity()) .min() .unwrap_or(0); - let sum_cap = manager - .data_layout + let sum_cap = data_layout .data_dirs .iter() .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) @@ -543,7 +623,7 @@ impl BlockStoreIterator { let mut cum_cap = 0; let mut todo = vec![]; - for dir in manager.data_layout.data_dirs.iter() { + for dir in data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, -- cgit v1.2.3 From 2657b5c1b911b7c5f2d97f8c564e60202ddf4124 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 15:30:56 +0200 Subject: block manager: fix bugs --- src/block/layout.rs | 1 - src/block/repair.rs | 30 ++++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index e32ef785..1d8f4cda 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -200,7 +200,6 @@ impl DataLayout { // it might have been removed and added again and might contain data, // so add it as a secondary storage location for all partitions // to make sure existing files are not lost - let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if self.data_dirs.iter().any(|ed| ed.path == dd.path) { continue; diff --git a/src/block/repair.rs b/src/block/repair.rs index 1bea9f09..e18eeaeb 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use crate::block::*; use crate::layout::*; use crate::manager::*; @@ -528,8 +529,10 @@ impl Worker for ScrubWorker { pub struct RebalanceWorker { manager: Arc, block_iter: BlockStoreIterator, + t_started: u64, + t_finished: Option, moved: usize, - moved_bytes: usize, + moved_bytes: u64, } impl RebalanceWorker { @@ -538,6 +541,8 @@ impl RebalanceWorker { Self { manager, block_iter, + t_started: now_msec(), + t_finished: None, moved: 0, moved_bytes: 0, } @@ -551,11 +556,18 @@ impl Worker for RebalanceWorker { } fn status(&self) -> WorkerStatus { + let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); + let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), freeform: vec![ format!("Blocks moved: {}", self.moved), - format!("Bytes moved: {}", self.moved_bytes), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), ], ..Default::default() } @@ -565,12 +577,21 @@ impl Worker for RebalanceWorker { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.parent().expect("no parent?") != prim_loc { + let path = match path.extension() { + None => DataBlockPath::Plain(path), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + _ => { + warn!("not rebalancing file: {}", path.to_string_lossy()); + return Ok(WorkerState::Busy); + } + }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - let data = self.manager.read_block(&hash).await?; + debug!("rebalance: moving block {:?}", hash); + let data = self.manager.read_block_from(&hash, &path).await?; self.manager.write_block(&hash, &data).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len(); + self.moved_bytes += data.inner_buffer().len() as u64; } Ok(WorkerState::Busy) } else { @@ -589,6 +610,7 @@ impl Worker for RebalanceWorker { .save_async(&new_layout) .await?; self.manager.data_layout.store(Arc::new(new_layout)); + self.t_finished = Some(now_msec()); Ok(WorkerState::Done) } } -- cgit v1.2.3 From be91ef6294bcc699f075746fd3abb57a9b22e838 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 7 Sep 2023 16:04:03 +0200 Subject: block manager: fix bug where rebalance didn't delete old copies --- src/block/block.rs | 1 + src/block/manager.rs | 39 ++++++++++++++++++++++++++++++++++++++- src/block/repair.rs | 37 ++++++++++++++++++++----------------- 3 files changed, 59 insertions(+), 18 deletions(-) (limited to 'src/block') diff --git a/src/block/block.rs b/src/block/block.rs index 6d79fb6c..20f57aa5 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -21,6 +21,7 @@ pub enum DataBlock { Compressed(Bytes), } +#[derive(Debug)] pub enum DataBlockPath { /// Uncompressed data fail Plain(PathBuf), diff --git a/src/block/manager.rs b/src/block/manager.rs index e0fbfe74..ea70b19c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -645,6 +645,19 @@ impl BlockManager { None } + /// Rewrite a block at the primary location for its path and delete the old path. + /// Returns the number of bytes read/written + pub(crate) async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + ) -> Result { + self.lock_mutate(hash) + .await + .fix_block_location(hash, wrong_path, self) + .await + } + async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize @@ -682,6 +695,17 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, + ) -> Result<(), Error> { + let existing_path = mgr.find_block(hash).await; + self.write_block_inner(hash, data, mgr, existing_path).await + } + + async fn write_block_inner( + &self, + hash: &Hash, + data: &DataBlock, + mgr: &BlockManager, + existing_path: Option, ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -694,7 +718,7 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (mgr.find_block(hash).await, compressed) { + let to_delete = match (existing_path, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), @@ -716,6 +740,7 @@ impl BlockManagerLocked { // If the block isn't stored already, just store what is given to us (None, _) => None, }; + assert!(to_delete.as_ref() != Some(&tgt_path)); let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); @@ -792,6 +817,18 @@ impl BlockManagerLocked { } Ok(()) } + + async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + mgr: &BlockManager, + ) -> Result { + let data = mgr.read_block_from(hash, &wrong_path).await?; + self.write_block_inner(hash, &data, mgr, Some(wrong_path)) + .await?; + Ok(data.inner_buffer().len()) + } } async fn read_stream_to_end(mut stream: ByteStream) -> Result { diff --git a/src/block/repair.rs b/src/block/repair.rs index e18eeaeb..bd14085f 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -558,17 +558,21 @@ impl Worker for RebalanceWorker { fn status(&self) -> WorkerStatus { let t_cur = self.t_finished.unwrap_or_else(|| now_msec()); let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000); + let mut freeform = vec![ + format!("Blocks moved: {}", self.moved), + format!( + "Bytes moved: {} ({}/s)", + bytesize::ByteSize::b(self.moved_bytes), + bytesize::ByteSize::b(rate) + ), + format!("Started: {}", msec_to_rfc3339(self.t_started)), + ]; + if let Some(t_fin) = self.t_finished { + freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin))) + } WorkerStatus { progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), - freeform: vec![ - format!("Blocks moved: {}", self.moved), - format!( - "Bytes moved: {} ({}/s)", - bytesize::ByteSize::b(self.moved_bytes), - bytesize::ByteSize::b(rate) - ), - format!("Started: {}", msec_to_rfc3339(self.t_started)), - ], + freeform, ..Default::default() } } @@ -576,10 +580,10 @@ impl Worker for RebalanceWorker { async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { if let Some((path, hash)) = self.block_iter.next().await? { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); - if path.parent().expect("no parent?") != prim_loc { - let path = match path.extension() { - None => DataBlockPath::Plain(path), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path), + if path.ancestors().all(|x| x != prim_loc) { + let block_path = match path.extension() { + None => DataBlockPath::Plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); @@ -587,11 +591,10 @@ impl Worker for RebalanceWorker { }; // block is not in its primary location, // move it there (reading and re-writing does the trick) - debug!("rebalance: moving block {:?}", hash); - let data = self.manager.read_block_from(&hash, &path).await?; - self.manager.write_block(&hash, &data).await?; + debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc); + let block_len = self.manager.fix_block_location(&hash, block_path).await?; self.moved += 1; - self.moved_bytes += data.inner_buffer().len() as u64; + self.moved_bytes += block_len as u64; } Ok(WorkerState::Busy) } else { -- cgit v1.2.3 From de5d7921813ad84038053c96004ce617bc144722 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 11:52:57 +0200 Subject: block manager: fix indentation (why not detected by cargo fmt?) --- src/block/layout.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) (limited to 'src/block') diff --git a/src/block/layout.rs b/src/block/layout.rs index 1d8f4cda..e8339405 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -287,22 +287,22 @@ fn make_data_dirs(dirs: &DataDirEnum) -> Result, Error> { let mut ok = false; for dir in dirs.iter() { let state = match &dir.capacity { - Some(cap) if dir.read_only == false => { - let capacity = cap.parse::() - .ok_or_message("invalid capacity value")?.as_u64(); - if capacity == 0 { - return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); - } - ok = true; - DataDirState::Active { - capacity, - } - } - None if dir.read_only == true => { - DataDirState::ReadOnly - } - _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), - }; + Some(cap) if dir.read_only == false => { + let capacity = cap.parse::() + .ok_or_message("invalid capacity value")?.as_u64(); + if capacity == 0 { + return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); + } + ok = true; + DataDirState::Active { + capacity, + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; data_dirs.push(DataDir { path: dir.path.clone(), state, -- cgit v1.2.3 From 7f9ba49c7151a0c3c29fbe0b0208b4a1f1dfc1e8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 11:57:36 +0200 Subject: block manager: remove data_dir field --- src/block/manager.rs | 3 --- 1 file changed, 3 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index ea70b19c..2d1b5c67 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -80,8 +80,6 @@ pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory/ies in which block are stored - pub data_dir: DataDirEnum, /// Data layout pub(crate) data_layout: ArcSwap, /// Data layout persister @@ -169,7 +167,6 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, - data_dir, data_layout: ArcSwap::new(Arc::new(data_layout)), data_layout_persister, data_fsync, -- cgit v1.2.3 From 9526328d386ab6261df416327c2efb0791369339 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:10:48 +0200 Subject: scrub: clear saved checkpoint when canceling scrub --- src/block/repair.rs | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index bd14085f..a464e2b6 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -372,6 +372,9 @@ impl ScrubWorker { ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => { + if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) { + error!("Could not save scrub checkpoint: {}", e); + } ScrubWorkerState::Finished } work => { -- cgit v1.2.3 From ba7ac52c196c452e0b09fef63862264e0c4582bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 11 Sep 2023 12:28:29 +0200 Subject: block repair: simpler/more robust iterator progress calculation --- src/block/repair.rs | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) (limited to 'src/block') diff --git a/src/block/repair.rs b/src/block/repair.rs index a464e2b6..77ee0d14 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -18,7 +18,6 @@ use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::block::*; -use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below @@ -636,31 +635,23 @@ impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let data_layout = manager.data_layout.load_full(); - let min_cap = data_layout - .data_dirs - .iter() - .filter_map(|x| x.capacity()) - .min() - .unwrap_or(0); - - let sum_cap = data_layout - .data_dirs - .iter() - .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) - .sum::() as u128; + let mut dir_cap = vec![0; data_layout.data_dirs.len()]; + for prim in data_layout.part_prim.iter() { + dir_cap[*prim as usize] += 1; + } + for sec_vec in data_layout.part_sec.iter() { + for sec in sec_vec.iter() { + dir_cap[*sec as usize] += 1; + } + } + let sum_cap = dir_cap.iter().sum::() as u64; let mut cum_cap = 0; let mut todo = vec![]; - for dir in data_layout.data_dirs.iter() { - let cap = match dir.state { - DataDirState::Active { capacity } => capacity, - _ => min_cap, - }; - - let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - let progress_max = - (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; - cum_cap += cap; + for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) { + let progress_min = (cum_cap * PROGRESS_FP) / sum_cap; + let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap; + cum_cap += cap as u64; todo.push(BsiTodo::Directory { path: dir.path.clone(), -- cgit v1.2.3