diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/block/layout.rs | 8 | ||||
-rw-r--r-- | src/block/manager.rs | 22 | ||||
-rw-r--r-- | src/block/repair.rs | 90 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 3 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 6 |
5 files changed, 115 insertions, 14 deletions
diff --git a/src/block/layout.rs b/src/block/layout.rs index 8098654f..e32ef785 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -252,6 +252,14 @@ impl DataLayout { path.push(hex::encode(&hash.as_slice()[1..2])); path } + + pub(crate) fn without_secondary_locations(&self) -> Self { + Self { + data_dirs: self.data_dirs.clone(), + part_prim: self.part_prim.clone(), + part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(), + } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 0081f46c..e0fbfe74 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; @@ -83,7 +83,9 @@ pub struct BlockManager { /// Directory/ies in which block are stored pub data_dir: DataDirEnum, /// Data layout - pub(crate) data_layout: DataLayout, + pub(crate) data_layout: ArcSwap<DataLayout>, + /// Data layout persister + pub(crate) data_layout_persister: Persister<DataLayout>, data_fsync: bool, compression_level: Option<i32>, @@ -129,9 +131,9 @@ impl BlockManager { system: Arc<System>, ) -> Result<Arc<Self>, Error> { // Load or compute layout, i.e. assignment of data blocks to the different data directories - let layout_persister: Persister<DataLayout> = + let data_layout_persister: Persister<DataLayout> = Persister::new(&system.metadata_dir, "data_layout"); - let data_layout = match layout_persister.load() { + let data_layout = match data_layout_persister.load() { Ok(mut layout) => { layout .update(&data_dir) @@ -140,7 +142,7 @@ impl BlockManager { } Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; - layout_persister + data_layout_persister .save(&data_layout) .expect("cannot save data_layout"); @@ -168,7 +170,8 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, - data_layout, + data_layout: ArcSwap::new(Arc::new(data_layout)), + data_layout_persister, data_fsync, compression_level, mutation_lock: vec![(); MUTEX_COUNT] @@ -606,9 +609,10 @@ impl BlockManager { /// Find the path where a block is currently stored pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> { - let dirs = Some(self.data_layout.primary_block_dir(hash)) + let data_layout = self.data_layout.load_full(); + let dirs = Some(data_layout.primary_block_dir(hash)) .into_iter() - .chain(self.data_layout.secondary_block_dirs(hash)); + .chain(data_layout.secondary_block_dirs(hash)); let filename = hex::encode(hash.as_ref()); for dir in dirs { @@ -682,7 +686,7 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let directory = mgr.data_layout.primary_block_dir(hash); + let directory = mgr.data_layout.load().primary_block_dir(hash); let mut tgt_path = directory.clone(); tgt_path.push(hex::encode(hash)); diff --git a/src/block/repair.rs b/src/block/repair.rs index a7c90d4f..1bea9f09 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -519,6 +519,86 @@ impl Worker for ScrubWorker { } // ---- ---- ---- +// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS +// between multiple storage locations. +// This is a one-shot repair operation that can be launched, +// checks everything, and then exits. +// ---- ---- ---- + +pub struct RebalanceWorker { + manager: Arc<BlockManager>, + block_iter: BlockStoreIterator, + moved: usize, + moved_bytes: usize, +} + +impl RebalanceWorker { + pub fn new(manager: Arc<BlockManager>) -> Self { + let block_iter = BlockStoreIterator::new(&manager); + Self { + manager, + block_iter, + moved: 0, + moved_bytes: 0, + } + } +} + +#[async_trait] +impl Worker for RebalanceWorker { + fn name(&self) -> String { + "Block rebalance worker".into() + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)), + freeform: vec![ + format!("Blocks moved: {}", self.moved), + format!("Bytes moved: {}", self.moved_bytes), + ], + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if let Some((path, hash)) = self.block_iter.next().await? { + let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); + if path.parent().expect("no parent?") != prim_loc { + // block is not in its primary location, + // move it there (reading and re-writing does the trick) + let data = self.manager.read_block(&hash).await?; + self.manager.write_block(&hash, &data).await?; + self.moved += 1; + self.moved_bytes += data.inner_buffer().len(); + } + Ok(WorkerState::Busy) + } else { + // all blocks are in their primary location: + // - the ones we moved now are + // - the ones written in the meantime always were, because we only + // write to primary locations + // so we can safely remove all secondary locations from the data layout + let new_layout = self + .manager + .data_layout + .load_full() + .without_secondary_locations(); + self.manager + .data_layout_persister + .save_async(&new_layout) + .await?; + self.manager.data_layout.store(Arc::new(new_layout)); + Ok(WorkerState::Done) + } + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- @@ -526,16 +606,16 @@ const PROGRESS_FP: u64 = 1_000_000_000; impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { - let min_cap = manager - .data_layout + let data_layout = manager.data_layout.load_full(); + + let min_cap = data_layout .data_dirs .iter() .filter_map(|x| x.capacity()) .min() .unwrap_or(0); - let sum_cap = manager - .data_layout + let sum_cap = data_layout .data_dirs .iter() .map(|x| x.capacity().unwrap_or(min_cap /* approximation */)) @@ -543,7 +623,7 @@ impl BlockStoreIterator { let mut cum_cap = 0; let mut todo = vec![]; - for dir in manager.data_layout.data_dirs.iter() { + for dir in data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 9ca4a059..fd37a24e 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -471,6 +471,9 @@ pub enum RepairWhat { #[structopt(subcommand)] cmd: ScrubCmd, }, + /// Rebalance data blocks among storage locations + #[structopt(name = "rebalance", version = garage_version())] + Rebalance, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index abfaf9f9..9e4de873 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -70,6 +70,12 @@ pub async fn launch_online_repair( info!("Sending command to scrub worker: {:?}", cmd); garage.block_manager.send_scrub_command(cmd).await?; } + RepairWhat::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } } Ok(()) } |