aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/block.rs10
-rw-r--r--src/block/layout.rs337
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs458
-rw-r--r--src/block/rc.rs4
-rw-r--r--src/block/repair.rs410
-rw-r--r--src/block/resync.rs19
8 files changed, 933 insertions, 307 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 963086ac..6a8f1af4 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -24,6 +24,7 @@ opentelemetry = "0.17"
arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
+bytesize = "1.2"
hex = "0.4"
tracing = "0.1"
rand = "0.8"
diff --git a/src/block/block.rs b/src/block/block.rs
index 935aa900..20f57aa5 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -1,3 +1,5 @@
+use std::path::PathBuf;
+
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -19,6 +21,14 @@ pub enum DataBlock {
Compressed(Bytes),
}
+#[derive(Debug)]
+pub enum DataBlockPath {
+ /// Uncompressed data fail
+ Plain(PathBuf),
+ /// Compressed data fail
+ Compressed(PathBuf),
+}
+
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
diff --git a/src/block/layout.rs b/src/block/layout.rs
new file mode 100644
index 00000000..e8339405
--- /dev/null
+++ b/src/block/layout.rs
@@ -0,0 +1,337 @@
+use std::path::PathBuf;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::config::DataDirEnum;
+use garage_util::data::Hash;
+use garage_util::error::{Error, OkOrMessage};
+use garage_util::migrate::*;
+
+type Idx = u16;
+
+const DRIVE_NPART: usize = 1024;
+
+const HASH_DRIVE_BYTES: (usize, usize) = (2, 3);
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) struct DataLayout {
+ pub(crate) data_dirs: Vec<DataDir>,
+
+ /// Primary storage location (index in data_dirs) for each partition
+ /// = the location where the data is supposed to be, blocks are always
+ /// written there (copies in other dirs may be deleted if they exist)
+ pub(crate) part_prim: Vec<Idx>,
+ /// Secondary storage locations for each partition = locations
+ /// where data blocks might be, we check from these dirs when reading
+ pub(crate) part_sec: Vec<Vec<Idx>>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
+pub(crate) struct DataDir {
+ pub(crate) path: PathBuf,
+ pub(crate) state: DataDirState,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum DataDirState {
+ Active { capacity: u64 },
+ ReadOnly,
+}
+
+impl DataLayout {
+ pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> {
+ let data_dirs = make_data_dirs(dirs)?;
+
+ // Split partitions proportionnally to capacity for all drives
+ // to affect primary storage location
+ let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
+ assert!(total_cap > 0);
+
+ let mut part_prim = Vec::with_capacity(DRIVE_NPART);
+ let mut cum_cap = 0;
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if let DataDirState::Active { capacity } = dd.state {
+ cum_cap += capacity;
+ let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ part_prim.resize(n_total as usize, i as Idx);
+ }
+ }
+ assert_eq!(cum_cap, total_cap);
+ assert_eq!(part_prim.len(), DRIVE_NPART);
+
+ // If any of the storage locations is non-empty, it probably existed before
+ // this algorithm was added, so add it as a secondary storage location for all partitions
+ // to make sure existing files are not lost
+ let mut part_sec = vec![vec![]; DRIVE_NPART];
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if dir_not_empty(&dd.path)? {
+ for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
+ if *prim != i as Idx {
+ sec.push(i as Idx);
+ }
+ }
+ }
+ }
+
+ Ok(Self {
+ data_dirs,
+ part_prim,
+ part_sec,
+ })
+ }
+
+ pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> {
+ // Make list of new data directories, exit if nothing changed
+ let data_dirs = make_data_dirs(dirs)?;
+ if data_dirs == self.data_dirs {
+ return Ok(());
+ }
+
+ let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
+ assert!(total_cap > 0);
+
+ // Compute mapping of old indices to new indices
+ let old2new = self
+ .data_dirs
+ .iter()
+ .map(|x| {
+ data_dirs
+ .iter()
+ .position(|y| y.path == x.path)
+ .map(|x| x as Idx)
+ })
+ .collect::<Vec<_>>();
+
+ // Compute secondary location list for partitions based on existing
+ // folders, translating indices from old to new
+ let mut part_sec = self
+ .part_sec
+ .iter()
+ .map(|dl| {
+ dl.iter()
+ .filter_map(|old| old2new.get(*old as usize).copied().flatten())
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
+
+ // Compute a vector that, for each data dir,
+ // contains the list of partitions primarily stored on that drive
+ let mut dir_prim = vec![vec![]; data_dirs.len()];
+ for (ipart, prim) in self.part_prim.iter().enumerate() {
+ if let Some(new) = old2new.get(*prim as usize).copied().flatten() {
+ dir_prim[new as usize].push(ipart);
+ }
+ }
+
+ // Compute the target number of partitions per data directory
+ let mut cum_cap = 0;
+ let mut npart_per_dir = vec![0; data_dirs.len()];
+ for (idir, dd) in data_dirs.iter().enumerate() {
+ if let DataDirState::Active { capacity } = dd.state {
+ let begin = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ cum_cap += capacity;
+ let end = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ npart_per_dir[idir] = (end - begin) as usize;
+ }
+ }
+ assert_eq!(cum_cap, total_cap);
+ assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART);
+
+ // For all directories that have too many primary partitions,
+ // move that partition to secondary
+ for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
+ {
+ while parts.len() > *tgt_npart {
+ let part = parts.pop().unwrap();
+ if !part_sec[part].contains(&(idir as Idx)) {
+ part_sec[part].push(idir as Idx);
+ }
+ }
+ }
+
+ // Calculate the vector of primary partition dir index
+ let mut part_prim = vec![None; DRIVE_NPART];
+ for (idir, parts) in dir_prim.iter().enumerate() {
+ for part in parts.iter() {
+ assert!(part_prim[*part].is_none());
+ part_prim[*part] = Some(idir as Idx)
+ }
+ }
+
+ // Calculate a vector of unassigned partitions
+ let mut unassigned = part_prim
+ .iter()
+ .enumerate()
+ .filter(|(_, dir)| dir.is_none())
+ .map(|(ipart, _)| ipart)
+ .collect::<Vec<_>>();
+
+ // For all directories that don't have enough primary partitions,
+ // add partitions from unassigned
+ for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
+ {
+ if parts.len() < *tgt_npart {
+ let required = *tgt_npart - parts.len();
+ assert!(unassigned.len() >= required);
+ for _ in 0..required {
+ let new_part = unassigned.pop().unwrap();
+ part_prim[new_part] = Some(idir as Idx);
+ part_sec[new_part].retain(|x| *x != idir as Idx);
+ }
+ }
+ }
+
+ // Sanity checks
+ assert!(part_prim.iter().all(|x| x.is_some()));
+ assert!(unassigned.is_empty());
+
+ // Transform part_prim from vec of Option<Idx> to vec of Idx
+ let part_prim = part_prim
+ .into_iter()
+ .map(|x| x.unwrap())
+ .collect::<Vec<_>>();
+ assert!(part_prim.iter().all(|p| data_dirs
+ .get(*p as usize)
+ .and_then(|x| x.capacity())
+ .unwrap_or(0)
+ > 0));
+
+ // If any of the newly added storage locations is non-empty,
+ // it might have been removed and added again and might contain data,
+ // so add it as a secondary storage location for all partitions
+ // to make sure existing files are not lost
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if self.data_dirs.iter().any(|ed| ed.path == dd.path) {
+ continue;
+ }
+ if dir_not_empty(&dd.path)? {
+ for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
+ if *prim != i as Idx && !sec.contains(&(i as Idx)) {
+ sec.push(i as Idx);
+ }
+ }
+ }
+ }
+
+ // Apply newly generated config
+ *self = Self {
+ data_dirs,
+ part_prim,
+ part_sec,
+ };
+ Ok(())
+ }
+
+ pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf {
+ let ipart = self.partition_from(hash);
+ let idir = self.part_prim[ipart] as usize;
+ self.block_dir_from(hash, &self.data_dirs[idir].path)
+ }
+
+ pub(crate) fn secondary_block_dirs<'a>(
+ &'a self,
+ hash: &'a Hash,
+ ) -> impl Iterator<Item = PathBuf> + 'a {
+ let ipart = self.partition_from(hash);
+ self.part_sec[ipart]
+ .iter()
+ .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path))
+ }
+
+ fn partition_from(&self, hash: &Hash) -> usize {
+ u16::from_be_bytes([
+ hash.as_slice()[HASH_DRIVE_BYTES.0],
+ hash.as_slice()[HASH_DRIVE_BYTES.1],
+ ]) as usize % DRIVE_NPART
+ }
+
+ fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {
+ let mut path = dir.clone();
+ path.push(hex::encode(&hash.as_slice()[0..1]));
+ path.push(hex::encode(&hash.as_slice()[1..2]));
+ path
+ }
+
+ pub(crate) fn without_secondary_locations(&self) -> Self {
+ Self {
+ data_dirs: self.data_dirs.clone(),
+ part_prim: self.part_prim.clone(),
+ part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(),
+ }
+ }
+}
+
+impl InitialFormat for DataLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09bmdl";
+}
+
+impl DataDir {
+ pub fn capacity(&self) -> Option<u64> {
+ match self.state {
+ DataDirState::Active { capacity } => Some(capacity),
+ _ => None,
+ }
+ }
+}
+
+fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
+ let mut data_dirs = vec![];
+ match dirs {
+ DataDirEnum::Single(path) => data_dirs.push(DataDir {
+ path: path.clone(),
+ state: DataDirState::Active {
+ capacity: 1_000_000_000, // whatever, doesn't matter
+ },
+ }),
+ DataDirEnum::Multiple(dirs) => {
+ let mut ok = false;
+ for dir in dirs.iter() {
+ let state = match &dir.capacity {
+ Some(cap) if dir.read_only == false => {
+ let capacity = cap.parse::<bytesize::ByteSize>()
+ .ok_or_message("invalid capacity value")?.as_u64();
+ if capacity == 0 {
+ return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy())));
+ }
+ ok = true;
+ DataDirState::Active {
+ capacity,
+ }
+ }
+ None if dir.read_only == true => {
+ DataDirState::ReadOnly
+ }
+ _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))),
+ };
+ data_dirs.push(DataDir {
+ path: dir.path.clone(),
+ state,
+ });
+ }
+ if !ok {
+ return Err(Error::Message(
+ "incorrect data_dir configuration, no primary writable directory specified"
+ .into(),
+ ));
+ }
+ }
+ }
+ Ok(data_dirs)
+}
+
+fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> {
+ for entry in std::fs::read_dir(&path)? {
+ let dir = entry?;
+ if dir.file_type()?.is_dir()
+ && dir
+ .file_name()
+ .into_string()
+ .ok()
+ .and_then(|hex| hex::decode(&hex).ok())
+ .is_some()
+ {
+ return Ok(true);
+ }
+ }
+ Ok(false)
+}
diff --git a/src/block/lib.rs b/src/block/lib.rs
index d2814f77..c9ff2845 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -6,5 +6,6 @@ pub mod repair;
pub mod resync;
mod block;
+mod layout;
mod metrics;
mod rc;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 3ece9a8a..2d1b5c67 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
+use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
@@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
+use garage_util::config::DataDirEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::PersisterShared;
+use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
@@ -38,6 +39,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
+use crate::layout::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
@@ -77,12 +79,16 @@ impl Rpc for BlockRpc {
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
- /// Directory in which block are stored
- pub data_dir: PathBuf,
+ /// Data layout
+ pub(crate) data_layout: ArcSwap<DataLayout>,
+ /// Data layout persister
+ pub(crate) data_layout_persister: Persister<DataLayout>,
+
+ data_fsync: bool,
compression_level: Option<i32>,
- mutation_lock: [Mutex<BlockManagerLocked>; 256],
+ mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@@ -105,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64,
}
+// The number of different mutexes used to parallelize write access to data blocks
+const MUTEX_COUNT: usize = 256;
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -113,11 +122,29 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: PathBuf,
+ data_dir: DataDirEnum,
+ data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
- ) -> Arc<Self> {
+ ) -> Result<Arc<Self>, Error> {
+ // Load or compute layout, i.e. assignment of data blocks to the different data directories
+ let data_layout_persister: Persister<DataLayout> =
+ Persister::new(&system.metadata_dir, "data_layout");
+ let data_layout = match data_layout_persister.load() {
+ Ok(mut layout) => {
+ layout
+ .update(&data_dir)
+ .ok_or_message("invalid data_dir config")?;
+ layout
+ }
+ Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
+ };
+ data_layout_persister
+ .save(&data_layout)
+ .expect("cannot save data_layout");
+
+ // Open metadata tables
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -140,9 +167,14 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
- data_dir,
+ data_layout: ArcSwap::new(Arc::new(data_layout)),
+ data_layout_persister,
+ data_fsync,
compression_level,
- mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
+ mutation_lock: vec![(); MUTEX_COUNT]
+ .iter()
+ .map(|_| Mutex::new(BlockManagerLocked()))
+ .collect::<Vec<_>>(),
rc,
resync,
system,
@@ -154,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap();
- block_manager
+ Ok(block_manager)
}
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@@ -201,44 +233,10 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
-
- for node in who.iter() {
- let node_id = NodeID::from(*node);
- let rpc = self.endpoint.call_streaming(
- &node_id,
- BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
- );
- tokio::select! {
- res = rpc => {
- let res = match res {
- Ok(res) => res,
- Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
- continue;
- }
- };
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
- continue;
- }
- };
- return Ok((header, stream));
- }
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
- }
- };
- }
-
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ Ok((header, stream))
+ })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -248,6 +246,24 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ read_stream_to_end(stream)
+ .await
+ .map(|data| DataBlock::from_parts(header, data))
+ })
+ .await
+ }
+
+ async fn rpc_get_raw_block_internal<F, Fut, T>(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ f: F,
+ ) -> Result<T, Error>
+ where
+ F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ Fut: futures::Future<Output = Result<T, Error>>,
+ {
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
@@ -263,34 +279,41 @@ impl BlockManager {
let res = match res {
Ok(res) => res,
Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
+ debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
+ (Ok(_), _) => {
+ debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
+ continue;
+ }
+ (Err(e), _) => {
+ debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
continue;
}
};
- match read_stream_to_end(stream).await {
- Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ match f(header, stream).await {
+ Ok(ret) => return Ok(ret),
Err(e) => {
- debug!("Error reading stream from node {:?}: {}", node, e);
+ debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
}
}
}
+ // TODO: sleep less long (fail early), initiate a second request earlier
+ // if the first one doesn't succeed rapidly
+ // TODO: keep first request running when initiating a new one and take the
+ // one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
+ debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ let msg = format!("Get block {:?}: no node returned a valid block", hash);
+ debug!("{}", msg);
+ Err(Error::Message(msg))
}
// ---- Public interface ----
@@ -468,8 +491,6 @@ impl BlockManager {
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
- let write_size = data.inner_buffer().len() as u64;
-
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
@@ -479,8 +500,6 @@ impl BlockManager {
))
.await?;
- self.metrics.bytes_written.add(write_size);
-
Ok(())
}
@@ -507,36 +526,42 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let data = self
- .read_block_internal(hash)
- .bound_record_duration(&self.metrics.block_read_duration)
- .await?;
-
- self.metrics
- .bytes_read
- .add(data.inner_buffer().len() as u64);
-
- Ok(data)
+ let tracer = opentelemetry::global::tracer("garage");
+ async {
+ match self.find_block(hash).await {
+ Some(p) => self.read_block_from(hash, &p).await,
+ None => {
+ // Not found but maybe we should have had it ??
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ return Err(Error::Message(format!(
+ "block {:?} not found on node",
+ hash
+ )));
+ }
+ }
+ }
+ .bound_record_duration(&self.metrics.block_read_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManager::read_block"),
+ ))
+ .await
}
- async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let mut path = self.block_path(hash);
- let compressed = match self.is_block_compressed(hash).await {
- Ok(c) => c,
- Err(e) => {
- // Not found but maybe we should have had it ??
- self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
- return Err(Into::into(e));
- }
+ pub(crate) async fn read_block_from(
+ &self,
+ hash: &Hash,
+ block_path: &DataBlockPath,
+ ) -> Result<DataBlock, Error> {
+ let (path, compressed) = match block_path {
+ DataBlockPath::Plain(p) => (p, false),
+ DataBlockPath::Compressed(p) => (p, true),
};
- if compressed {
- path.set_extension("zst");
- }
- let mut f = fs::File::open(&path).await?;
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
@@ -548,29 +573,27 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
self.lock_mutate(hash)
.await
- .move_block_to_corrupted(hash, self)
+ .move_block_to_corrupted(block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
+
return Err(Error::CorruptData(*hash));
}
Ok(data)
}
- /// Check if this node has a block and whether it needs it
- pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.lock_mutate(hash)
- .await
- .check_block_status(hash, self)
- .await
- }
-
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
- Ok(needed.is_nonzero() && !exists)
+ let rc = self.rc.get_block_rc(hash)?;
+ let exists = self.find_block(hash).await.is_some();
+ Ok(rc.is_nonzero() && !exists)
}
/// Delete block if it is not needed anymore
@@ -581,59 +604,65 @@ impl BlockManager {
.await
}
- /// Utility: gives the path of the directory in which a block should be found
- fn block_dir(&self, hash: &Hash) -> PathBuf {
- let mut path = self.data_dir.clone();
- path.push(hex::encode(&hash.as_slice()[0..1]));
- path.push(hex::encode(&hash.as_slice()[1..2]));
- path
- }
-
- /// Utility: give the full path where a block should be found, minus extension if block is
- /// compressed
- fn block_path(&self, hash: &Hash) -> PathBuf {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash.as_ref()));
- path
- }
+ /// Find the path where a block is currently stored
+ pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
+ let data_layout = self.data_layout.load_full();
+ let dirs = Some(data_layout.primary_block_dir(hash))
+ .into_iter()
+ .chain(data_layout.secondary_block_dirs(hash));
+ let filename = hex::encode(hash.as_ref());
- /// Utility: check if block is stored compressed. Error if block is not stored
- async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
- let mut path = self.block_path(hash);
+ for dir in dirs {
+ let mut path = dir;
+ path.push(&filename);
- // If compression is disabled on node - check for the raw block
- // first and then a compressed one (as compression may have been
- // previously enabled).
- match self.compression_level {
- None => {
+ if self.compression_level.is_none() {
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
if fs::metadata(&path).await.is_ok() {
- return Ok(false);
+ return Some(DataBlockPath::Plain(path));
}
-
path.set_extension("zst");
-
- fs::metadata(&path).await.map(|_| true).map_err(Into::into)
- }
- _ => {
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Compressed(path));
+ }
+ } else {
path.set_extension("zst");
-
if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+ return Some(DataBlockPath::Compressed(path));
}
-
path.set_extension("");
-
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Plain(path));
+ }
}
}
+
+ None
+ }
+
+ /// Rewrite a block at the primary location for its path and delete the old path.
+ /// Returns the number of bytes read/written
+ pub(crate) async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ ) -> Result<usize, Error> {
+ self.lock_mutate(hash)
+ .await
+ .fix_block_location(hash, wrong_path, self)
+ .await
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
- self.mutation_lock[hash.as_slice()[0] as usize]
+ let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
+ % self.mutation_lock.len();
+ self.mutation_lock[ilock]
.lock()
.with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
+ tracer.start(format!("Acquire mutation_lock #{}", ilock)),
))
.await
}
@@ -646,7 +675,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, header } => Resp::new(
self.handle_put_block(*hash, *header, message.take_stream())
.await
- .map(|_| BlockRpc::Ok),
+ .map(|()| BlockRpc::Ok),
),
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
@@ -657,66 +686,78 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
}
}
-pub(crate) struct BlockStatus {
- pub(crate) exists: bool,
- pub(crate) needed: RcEntry,
-}
-
impl BlockManagerLocked {
- async fn check_block_status(
+ async fn write_block(
&self,
hash: &Hash,
+ data: &DataBlock,
mgr: &BlockManager,
- ) -> Result<BlockStatus, Error> {
- let exists = mgr.is_block_compressed(hash).await.is_ok();
- let needed = mgr.rc.get_block_rc(hash)?;
-
- Ok(BlockStatus { exists, needed })
+ ) -> Result<(), Error> {
+ let existing_path = mgr.find_block(hash).await;
+ self.write_block_inner(hash, data, mgr, existing_path).await
}
- async fn write_block(
+ async fn write_block_inner(
&self,
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
+ existing_path: Option<DataBlockPath>,
) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
- let mut path = mgr.block_dir(hash);
- let directory = path.clone();
- path.push(hex::encode(hash));
+ let directory = mgr.data_layout.load().primary_block_dir(hash);
- fs::create_dir_all(&directory).await?;
+ let mut tgt_path = directory.clone();
+ tgt_path.push(hex::encode(hash));
+ if compressed {
+ tgt_path.set_extension("zst");
+ }
- let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(()),
- (Ok(false), false) => return Ok(()),
- (Ok(false), true) => {
- let path_to_delete = path.clone();
- path.set_extension("zst");
- Some(path_to_delete)
- }
- (Err(_), compressed) => {
- if compressed {
- path.set_extension("zst");
- }
- None
- }
+ let to_delete = match (existing_path, compressed) {
+ // If the block is stored in the wrong directory,
+ // write it again at the correct path and delete the old path
+ (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
+ (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
+
+ // If the block is already stored not compressed but we have a compressed
+ // copy, write the compressed copy and delete the uncompressed one
+ (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
+
+ // If the block is already stored compressed,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
+
+ // If the block is already stored not compressed,
+ // and we don't have a compressed copy either,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
+
+ // If the block isn't stored already, just store what is given to us
+ (None, _) => None,
};
+ assert!(to_delete.as_ref() != Some(&tgt_path));
- let mut path_tmp = path.clone();
+ let mut path_tmp = tgt_path.clone();
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
path_tmp.set_extension(tmp_extension);
+ fs::create_dir_all(&directory).await?;
+
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
- f.sync_all().await?;
+ mgr.metrics.bytes_written.add(data.len() as u64);
+
+ if mgr.data_fsync {
+ f.sync_all().await?;
+ }
+
drop(f);
- fs::rename(path_tmp, path).await?;
+ fs::rename(path_tmp, tgt_path).await?;
delete_on_drop.cancel();
@@ -724,52 +765,67 @@ impl BlockManagerLocked {
fs::remove_file(to_delete).await?;
}
- // We want to ensure that when this function returns, data is properly persisted
- // to disk. The first step is the sync_all above that does an fsync on the data file.
- // Now, we do an fsync on the containing directory, to ensure that the rename
- // is persisted properly. See:
- // http://thedjbway.b0llix.net/qmail/syncdir.html
- let dir = fs::OpenOptions::new()
- .read(true)
- .mode(0)
- .open(directory)
- .await?;
- dir.sync_all().await?;
- drop(dir);
+ if mgr.data_fsync {
+ // We want to ensure that when this function returns, data is properly persisted
+ // to disk. The first step is the sync_all above that does an fsync on the data file.
+ // Now, we do an fsync on the containing directory, to ensure that the rename
+ // is persisted properly. See:
+ // http://thedjbway.b0llix.net/qmail/syncdir.html
+ let dir = fs::OpenOptions::new()
+ .read(true)
+ .mode(0)
+ .open(directory)
+ .await?;
+ dir.sync_all().await?;
+ drop(dir);
+ }
Ok(())
}
- async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- warn!(
- "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
- hash
- );
- let mut path = mgr.block_path(hash);
- let mut path2 = path.clone();
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
- path2.set_extension("zst.corrupted");
- } else {
- path2.set_extension("corrupted");
- }
+ async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
+ let (path, path2) = match block_path {
+ DataBlockPath::Plain(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("corrupted");
+ (p, p2)
+ }
+ DataBlockPath::Compressed(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("zst.corrupted");
+ (p, p2)
+ }
+ };
+
fs::rename(path, path2).await?;
Ok(())
}
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
-
- if exists && needed.is_deletable() {
- let mut path = mgr.block_path(hash);
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
+ let rc = mgr.rc.get_block_rc(hash)?;
+ if rc.is_deletable() {
+ while let Some(path) = mgr.find_block(hash).await {
+ let path = match path {
+ DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
+ };
+ fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
- fs::remove_file(path).await?;
- mgr.metrics.delete_counter.add(1);
}
Ok(())
}
+
+ async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ mgr: &BlockManager,
+ ) -> Result<usize, Error> {
+ let data = mgr.read_block_from(hash, &wrong_path).await?;
+ self.write_block_inner(hash, &data, mgr, Some(wrong_path))
+ .await?;
+ Ok(data.inner_buffer().len())
+ }
}
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
diff --git a/src/block/rc.rs b/src/block/rc.rs
index 94cb5eea..b6afb277 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -56,7 +56,7 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.db().transaction(|mut tx| {
+ self.rc.db().transaction(|tx| {
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
@@ -64,7 +64,7 @@ impl BlockRc {
}
_ => (),
};
- tx.commit(())
+ Ok(())
})?;
Ok(())
}
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 71093d69..77ee0d14 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use crate::block::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
- if let Some(hash) = bi.next().await? {
+ if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -175,7 +176,9 @@ mod v081 {
}
mod v082 {
+ use garage_util::data::Hash;
use serde::{Deserialize, Serialize};
+ use std::path::PathBuf;
use super::v081;
@@ -185,6 +188,27 @@ mod v082 {
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
+ #[serde(default)]
+ pub(crate) checkpoint: Option<BlockStoreIterator>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub struct BlockStoreIterator {
+ pub todo: Vec<BsiTodo>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ hash: Hash,
+ progress: u64,
+ },
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
@@ -199,6 +223,7 @@ mod v082 {
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
corruptions_detected: old.corruptions_detected,
+ checkpoint: None,
}
}
}
@@ -235,14 +260,23 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
+ checkpoint: None,
}
}
}
#[derive(Default)]
enum ScrubWorkerState {
- Running(BlockStoreIterator),
- Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Running {
+ iterator: BlockStoreIterator,
+ // time of the last checkpoint
+ t_cp: u64,
+ },
+ Paused {
+ iterator: BlockStoreIterator,
+ // time at which the scrub should be resumed
+ t_resume: u64,
+ },
#[default]
Finished,
}
@@ -261,10 +295,17 @@ impl ScrubWorker {
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
+ let work = match persister.get_with(|x| x.checkpoint.clone()) {
+ None => ScrubWorkerState::Finished,
+ Some(iterator) => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
+ };
Self {
manager,
rx_cmd,
- work: ScrubWorkerState::Finished,
+ work,
tranquilizer: Tranquilizer::new(30),
persister,
}
@@ -277,7 +318,16 @@ impl ScrubWorker {
ScrubWorkerState::Finished => {
info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
- ScrubWorkerState::Running(iterator)
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ }
}
work => {
error!("Cannot start scrub worker: already running!");
@@ -287,8 +337,18 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
- ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ ScrubWorkerState::Running { iterator, .. }
+ | ScrubWorkerState::Paused { iterator, .. } => {
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Paused {
+ iterator,
+ t_resume: now_msec() + dur.as_millis() as u64,
+ }
}
work => {
error!("Cannot pause scrub worker: not running!");
@@ -298,7 +358,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
work => {
error!("Cannot resume scrub worker: not paused!");
work
@@ -307,7 +370,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => {
+ if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
ScrubWorkerState::Finished
}
work => {
@@ -343,12 +409,15 @@ impl Worker for ScrubWorker {
..Default::default()
};
match &self.work {
- ScrubWorkerState::Running(bsi) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ ScrubWorkerState::Running { iterator, .. } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
}
- ScrubWorkerState::Paused(bsi, rt) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
- s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
+ ScrubWorkerState::Paused { iterator, t_resume } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
+ s.freeform = vec![format!(
+ "Scrub paused, resumes at {}",
+ msec_to_rfc3339(*t_resume)
+ )];
}
ScrubWorkerState::Finished => {
s.freeform = vec![
@@ -374,9 +443,11 @@ impl Worker for ScrubWorker {
};
match &mut self.work {
- ScrubWorkerState::Running(bsi) => {
+ ScrubWorkerState::Running { iterator, t_cp } => {
self.tranquilizer.reset();
- if let Some(hash) = bsi.next().await? {
+ let now = now_msec();
+
+ if let Some((_path, hash)) = iterator.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -385,16 +456,23 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e),
_ => (),
};
+
+ if now - *t_cp > 60 * 1000 {
+ self.persister
+ .set_with(|p| p.checkpoint = Some(iterator.clone()))?;
+ *t_cp = now;
+ }
+
Ok(self
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- let now = now_msec();
let next_scrub_timestamp = randomize_next_scrub_run_time(now);
self.persister.set_with(|p| {
p.time_last_complete_scrub = now;
p.time_next_run_scrub = next_scrub_timestamp;
+ p.checkpoint = None;
})?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
@@ -413,8 +491,8 @@ impl Worker for ScrubWorker {
async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
- ScrubWorkerState::Running(_) => return WorkerState::Busy,
- ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
+ ScrubWorkerState::Running { .. } => return WorkerState::Busy,
+ ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
@@ -437,110 +515,250 @@ impl Worker for ScrubWorker {
}
match &self.work {
- ScrubWorkerState::Running(_) => WorkerState::Busy,
+ ScrubWorkerState::Running { .. } => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
}
// ---- ---- ----
-// UTILITY FOR ENUMERATING THE BLOCK STORE
+// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
+// between multiple storage locations.
+// This is a one-shot repair operation that can be launched,
+// checks everything, and then exits.
// ---- ---- ----
-struct BlockStoreIterator {
- path: Vec<ReadingDir>,
+pub struct RebalanceWorker {
+ manager: Arc<BlockManager>,
+ block_iter: BlockStoreIterator,
+ t_started: u64,
+ t_finished: Option<u64>,
+ moved: usize,
+ moved_bytes: u64,
}
-enum ReadingDir {
- Pending(PathBuf),
- Read {
- subpaths: Vec<fs::DirEntry>,
- pos: usize,
- },
+impl RebalanceWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ let block_iter = BlockStoreIterator::new(&manager);
+ Self {
+ manager,
+ block_iter,
+ t_started: now_msec(),
+ t_finished: None,
+ moved: 0,
+ moved_bytes: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RebalanceWorker {
+ fn name(&self) -> String {
+ "Block rebalance worker".into()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
+ let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
+ let mut freeform = vec![
+ format!("Blocks moved: {}", self.moved),
+ format!(
+ "Bytes moved: {} ({}/s)",
+ bytesize::ByteSize::b(self.moved_bytes),
+ bytesize::ByteSize::b(rate)
+ ),
+ format!("Started: {}", msec_to_rfc3339(self.t_started)),
+ ];
+ if let Some(t_fin) = self.t_finished {
+ freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
+ }
+ WorkerStatus {
+ progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
+ freeform,
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some((path, hash)) = self.block_iter.next().await? {
+ let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
+ if path.ancestors().all(|x| x != prim_loc) {
+ let block_path = match path.extension() {
+ None => DataBlockPath::Plain(path.clone()),
+ Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
+ _ => {
+ warn!("not rebalancing file: {}", path.to_string_lossy());
+ return Ok(WorkerState::Busy);
+ }
+ };
+ // block is not in its primary location,
+ // move it there (reading and re-writing does the trick)
+ debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
+ let block_len = self.manager.fix_block_location(&hash, block_path).await?;
+ self.moved += 1;
+ self.moved_bytes += block_len as u64;
+ }
+ Ok(WorkerState::Busy)
+ } else {
+ // all blocks are in their primary location:
+ // - the ones we moved now are
+ // - the ones written in the meantime always were, because we only
+ // write to primary locations
+ // so we can safely remove all secondary locations from the data layout
+ let new_layout = self
+ .manager
+ .data_layout
+ .load_full()
+ .without_secondary_locations();
+ self.manager
+ .data_layout_persister
+ .save_async(&new_layout)
+ .await?;
+ self.manager.data_layout.store(Arc::new(new_layout));
+ self.t_finished = Some(now_msec());
+ Ok(WorkerState::Done)
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
}
+// ---- ---- ----
+// UTILITY FOR ENUMERATING THE BLOCK STORE
+// ---- ---- ----
+
+const PROGRESS_FP: u64 = 1_000_000_000;
+
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let root_dir = manager.data_dir.clone();
- Self {
- path: vec![ReadingDir::Pending(root_dir)],
+ let data_layout = manager.data_layout.load_full();
+
+ let mut dir_cap = vec![0; data_layout.data_dirs.len()];
+ for prim in data_layout.part_prim.iter() {
+ dir_cap[*prim as usize] += 1;
+ }
+ for sec_vec in data_layout.part_sec.iter() {
+ for sec in sec_vec.iter() {
+ dir_cap[*sec as usize] += 1;
+ }
}
+ let sum_cap = dir_cap.iter().sum::<usize>() as u64;
+
+ let mut cum_cap = 0;
+ let mut todo = vec![];
+ for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) {
+ let progress_min = (cum_cap * PROGRESS_FP) / sum_cap;
+ let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap;
+ cum_cap += cap as u64;
+
+ todo.push(BsiTodo::Directory {
+ path: dir.path.clone(),
+ progress_min,
+ progress_max,
+ });
+ }
+ // entries are processed back-to-front (because of .pop()),
+ // so reverse entries to process them in increasing progress bounds
+ todo.reverse();
+
+ let ret = Self { todo };
+ debug_assert!(ret.progress_invariant());
+
+ ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
- if self.path.is_empty() {
- 1.0
- } else {
- let mut ret = 0.0;
- let mut next_div = 1;
- for p in self.path.iter() {
- match p {
- ReadingDir::Pending(_) => break,
- ReadingDir::Read { subpaths, pos } => {
- next_div *= subpaths.len();
- ret += ((*pos - 1) as f32) / (next_div as f32);
- }
- }
- }
- ret
- }
+ self.todo
+ .last()
+ .map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => *progress_min,
+ BsiTodo::File { progress, .. } => *progress,
+ })
+ .map(|x| x as f32 / PROGRESS_FP as f32)
+ .unwrap_or(1.0)
}
- async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
- let last_path = match self.path.last_mut() {
+ match self.todo.pop() {
None => return Ok(None),
- Some(lp) => lp,
- };
-
- if let ReadingDir::Pending(path) = last_path {
- let mut reader = fs::read_dir(&path).await?;
- let mut subpaths = vec![];
- while let Some(ent) = reader.next_entry().await? {
- subpaths.push(ent);
- }
- *last_path = ReadingDir::Read { subpaths, pos: 0 };
- }
+ Some(BsiTodo::Directory {
+ path,
+ progress_min,
+ progress_max,
+ }) => {
+ let istart = self.todo.len();
+
+ let mut reader = fs::read_dir(&path).await?;
+ while let Some(ent) = reader.next_entry().await? {
+ let name = if let Ok(n) = ent.file_name().into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ft = ent.file_type().await?;
+ if ft.is_dir() && hex::decode(&name).is_ok() {
+ self.todo.push(BsiTodo::Directory {
+ path: ent.path(),
+ progress_min: 0,
+ progress_max: 0,
+ });
+ } else if ft.is_file() {
+ let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ hash: hash.into(),
+ progress: 0,
+ });
+ }
+ }
+ }
+ }
- let (subpaths, pos) = match *last_path {
- ReadingDir::Read {
- ref subpaths,
- ref mut pos,
- } => (subpaths, pos),
- ReadingDir::Pending(_) => unreachable!(),
- };
-
- let data_dir_ent = match subpaths.get(*pos) {
- None => {
- self.path.pop();
- continue;
- }
- Some(ent) => {
- *pos += 1;
- ent
+ let count = self.todo.len() - istart;
+ for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
+ let p1 = progress_min
+ + ((progress_max - progress_min) * i as u64) / count as u64;
+ let p2 = progress_min
+ + ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
+ match ent {
+ BsiTodo::Directory {
+ progress_min,
+ progress_max,
+ ..
+ } => {
+ *progress_min = p1;
+ *progress_max = p2;
+ }
+ BsiTodo::File { progress, .. } => {
+ *progress = p1;
+ }
+ }
+ }
+ self.todo[istart..].reverse();
+ debug_assert!(self.progress_invariant());
}
- };
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
- let path = data_dir_ent.path();
- self.path.push(ReadingDir::Pending(path));
- } else if name.len() == 64 {
- if let Ok(h) = hex::decode(name) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some(hash.into()));
+ Some(BsiTodo::File { path, hash, .. }) => {
+ return Ok(Some((path, hash)));
}
}
}
}
+
+ // for debug_assert!
+ fn progress_invariant(&self) -> bool {
+ let iter = self.todo.iter().map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => progress_min,
+ BsiTodo::File { progress, .. } => progress,
+ });
+ let iter_1 = iter.clone().skip(1);
+ iter.zip(iter_1).all(|(prev, next)| prev >= next)
+ }
}
diff --git a/src/block/resync.rs b/src/block/resync.rs
index ea280ad4..9c1da4a7 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// No more than 4 resync workers can be running in the system
-pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
+pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
// Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
@@ -359,20 +359,23 @@ impl BlockResyncManager {
}
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
- let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
+ let existing_path = manager.find_block(hash).await;
+ let exists = existing_path.is_some();
+ let rc = manager.rc.get_block_rc(hash)?;
- if exists != needed.is_needed() || exists != needed.is_nonzero() {
+ if exists != rc.is_needed() || exists != rc.is_nonzero() {
debug!(
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
hash,
exists,
- needed.is_nonzero(),
- needed.is_deletable(),
+ rc.is_nonzero(),
+ rc.is_deletable(),
);
}
- if exists && needed.is_deletable() {
+ if exists && rc.is_deletable() {
info!("Resync block {:?}: offloading and deleting", hash);
+ let existing_path = existing_path.unwrap();
let mut who = manager.replication.write_nodes(hash);
if who.len() < manager.replication.write_quorum() {
@@ -419,7 +422,7 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let block = manager.read_block(hash).await?;
+ let block = manager.read_block_from(hash, &existing_path).await?;
let (header, bytes) = block.into_parts();
let put_block_message = Req::new(BlockRpc::PutBlock {
hash: *hash,
@@ -451,7 +454,7 @@ impl BlockResyncManager {
manager.rc.clear_deleted_block_rc(hash)?;
}
- if needed.is_nonzero() && !exists {
+ if rc.is_nonzero() && !exists {
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash