aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.nix3
-rw-r--r--doc/book/cookbook/real-world.md11
-rw-r--r--doc/book/operations/durability-repairs.md11
-rw-r--r--doc/book/operations/multi-hdd.md101
-rw-r--r--doc/book/operations/upgrading.md2
-rw-r--r--doc/book/reference-manual/configuration.md13
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/block.rs10
-rw-r--r--src/block/layout.rs337
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs423
-rw-r--r--src/block/repair.rs410
-rw-r--r--src/block/resync.rs19
-rw-r--r--src/garage/cli/structs.rs3
-rw-r--r--src/garage/repair/online.rs6
-rw-r--r--src/model/garage.rs20
-rw-r--r--src/rpc/system.rs41
-rw-r--r--src/table/queue.rs2
-rw-r--r--src/util/config.rs22
20 files changed, 1125 insertions, 312 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 79b35191..1ace7cc2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1300,6 +1300,7 @@ dependencies = [
"async-compression",
"async-trait",
"bytes",
+ "bytesize",
"futures",
"futures-util",
"garage_db",
diff --git a/Cargo.nix b/Cargo.nix
index dc30c355..b9bda61d 100644
--- a/Cargo.nix
+++ b/Cargo.nix
@@ -33,7 +33,7 @@ args@{
ignoreLockHash,
}:
let
- nixifiedLockHash = "f5b86f9d75664ba528a26ae71f07a38e9c72c78fe331420b9b639e2a099d4dad";
+ nixifiedLockHash = "685d51432f57c5ad2d5c80e725822b9c9bfd7cc632340f70aa1377c1d89117e4";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@@ -1844,6 +1844,7 @@ in
async_compression = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.4.1" { inherit profileName; }).out;
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out;
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.4.0" { inherit profileName; }).out;
+ bytesize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.3.0" { inherit profileName; }).out;
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out;
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out;
garage_db = (rustPackages."unknown".garage_db."0.8.3" { inherit profileName; }).out;
diff --git a/doc/book/cookbook/real-world.md b/doc/book/cookbook/real-world.md
index 7061069f..a8fbb371 100644
--- a/doc/book/cookbook/real-world.md
+++ b/doc/book/cookbook/real-world.md
@@ -75,16 +75,11 @@ to store 2 TB of data in total.
- For the metadata storage, Garage does not do checksumming and integrity
verification on its own. If you are afraid of bitrot/data corruption,
- put your metadata directory on a BTRFS partition. Otherwise, just use regular
+ put your metadata directory on a ZFS or BTRFS partition. Otherwise, just use regular
EXT4 or XFS.
-- Having a single server with several storage drives is currently not very well
- supported in Garage ([#218](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/218)).
- For an easy setup, just put all your drives in a RAID0 or a ZFS RAIDZ array.
- If you're adventurous, you can try to format each of your disk as
- a separate XFS partition, and then run one `garage` daemon per disk drive,
- or use something like [`mergerfs`](https://github.com/trapexit/mergerfs) to merge
- all your disks in a single union filesystem that spreads load over them.
+- Servers with multiple HDDs are supported natively by Garage without resorting
+ to RAID, see [our dedicated documentation page](@/documentation/operations/multi-hdd.md).
## Get a Docker image
diff --git a/doc/book/operations/durability-repairs.md b/doc/book/operations/durability-repairs.md
index 498c8fda..b0d2c78a 100644
--- a/doc/book/operations/durability-repairs.md
+++ b/doc/book/operations/durability-repairs.md
@@ -91,6 +91,16 @@ is definitely lost, then there is no other choice than to declare your S3 object
as unrecoverable, and to delete them properly from the data store. This can be done
using the `garage block purge` command.
+## Rebalancing data directories
+
+In [multi-HDD setups](@/documentation/operations/multi-hdd.md), to ensure that
+data blocks are well balanced between storage locations, you may run a
+rebalance operation using `garage repair rebalance`. This is usefull when
+adding storage locations or when capacities of the storage locations have been
+changed. Once this is finished, Garage will know for each block of a single
+possible location where it can be, which can increase access speed. This
+operation will also move out all data from locations marked as read-only.
+
# Metadata operations
@@ -114,4 +124,3 @@ in your cluster, you can run one of the following repair procedures:
- `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version
- `garage repair block_refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)
-
diff --git a/doc/book/operations/multi-hdd.md b/doc/book/operations/multi-hdd.md
new file mode 100644
index 00000000..36445b0a
--- /dev/null
+++ b/doc/book/operations/multi-hdd.md
@@ -0,0 +1,101 @@
++++
+title = "Multi-HDD support"
+weight = 15
++++
+
+
+Since v0.9, Garage natively supports nodes that have several storage drives
+for storing data blocks (not for metadata storage).
+
+## Initial setup
+
+To set up a new Garage storage node with multiple HDDs,
+format and mount all your drives in different directories,
+and use a Garage configuration as follows:
+
+```toml
+data_dir = [
+ { path = "/path/to/hdd1", capacity = "2T" },
+ { path = "/path/to/hdd2", capacity = "4T" },
+]
+```
+
+Garage will automatically balance all blocks stored by the node
+among the different specified directories, proportionnally to the
+specified capacities.
+
+## Updating the list of storage locations
+
+If you add new storage locations to your `data_dir`,
+Garage will not rebalance existing data between storage locations.
+Newly written blocks will be balanced proportionnally to the specified capacities,
+and existing data may be moved between drives to improve balancing,
+but only opportunistically when a data block is re-written (e.g. an object
+is re-uploaded, or an object with a duplicate block is uploaded).
+
+To understand precisely what is happening, we need to dive in to how Garage
+splits data among the different storage locations.
+
+First of all, Garage divides the set of all possible block hashes
+in a fixed number of slices (currently 1024), and assigns
+to each slice a primary storage location among the specified data directories.
+The number of slices having their primary location in each data directory
+is proportionnal to the capacity specified in the config file.
+
+When Garage receives a block to write, it will always write it in the primary
+directory of the slice that contains its hash.
+
+Now, to be able to not lose existing data blocks when storage locations
+are added, Garage also keeps a list of secondary data directories
+for all of the hash slices. Secondary data directories for a slice indicates
+storage locations that once were primary directories for that slice, i.e. where
+Garage knows that data blocks of that slice might be stored.
+When Garage is requested to read a certain data block,
+it will first look in the primary storage directory of its slice,
+and if it doesn't find it there it goes through all of the secondary storage
+locations until it finds it. This allows Garage to continue operating
+normally when storage locations are added, without having to shuffle
+files between drives to place them in the correct location.
+
+This relatively simple strategy works well but does not ensure that data
+is correctly balanced among drives according to their capacity.
+To rebalance data, two strategies can be used:
+
+- Lazy rebalancing: when a block is re-written (e.g. the object is re-uploaded),
+ Garage checks whether the existing copy is in the primary directory of the slice
+ or in a secondary directory. If the current copy is in a secondary directory,
+ Garage re-writes a copy in the primary directory and deletes the one from the
+ secondary directory. This might never end up rebalancing everything if there
+ are data blocks that are only read and never written.
+
+- Active rebalancing: an operator of a Garage node can explicitly launch a repair
+ procedure that rebalances the data directories, moving all blocks to their
+ primary location. Once done, all secondary locations for all hash slices are
+ removed so that they won't be checked anymore when looking for a data block.
+
+## Read-only storage locations
+
+If you would like to move all data blocks from an existing data directory to one
+or several new data directories, mark the old directory as read-only:
+
+```toml
+data_dir = [
+ { path = "/path/to/old_data", read_only = true },
+ { path = "/path/to/new_hdd1", capacity = "2T" },
+ { path = "/path/to/new_hdd2", capacity = "4T" },
+]
+```
+
+Garage will be able to read requested blocks from the read-only directory.
+Garage will also move data out of the read-only directory either progressively
+(lazy rebalancing) or if requested explicitly (active rebalancing).
+
+Once an active rebalancing has finished, your read-only directory should be empty:
+it might still contain subdirectories, but no data files. You can check that
+it contains no files using:
+
+```bash
+find -type f /path/to/old_data # should not print anything
+```
+
+at which point it can be removed from the `data_dir` list in your config file.
diff --git a/doc/book/operations/upgrading.md b/doc/book/operations/upgrading.md
index e8919a19..9a738282 100644
--- a/doc/book/operations/upgrading.md
+++ b/doc/book/operations/upgrading.md
@@ -80,6 +80,6 @@ The entire procedure would look something like this:
5. If any specific migration procedure is required, it is usually in one of the two cases:
- It can be run on online nodes after the new version has started, during regular cluster operation.
- - it has to be run offline
+ - it has to be run offline, in which case you will have to again take all nodes offline one after the other to run the repair
For this last step, please refer to the specific documentation pertaining to the version upgrade you are doing.
diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md
index 3f6ec091..df1251c2 100644
--- a/doc/book/reference-manual/configuration.md
+++ b/doc/book/reference-manual/configuration.md
@@ -91,6 +91,19 @@ This folder can be placed on an HDD. The space available for `data_dir`
should be counted to determine a node's capacity
when [adding it to the cluster layout](@/documentation/cookbook/real-world.md).
+Since `v0.9.0`, Garage supports multiple data directories with the following syntax:
+
+```toml
+data_dir = [
+ { path = "/path/to/old_data", read_only = true },
+ { path = "/path/to/new_hdd1", capacity = "2T" },
+ { path = "/path/to/new_hdd2", capacity = "4T" },
+]
+```
+
+See [the dedicated documentation page](@/documentation/operations/multi-hdd.md)
+on how to operate Garage in such a setup.
+
### `db_engine` (since `v0.8.0`)
By default, Garage uses the Sled embedded database library
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 1057b699..b77988d6 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -24,6 +24,7 @@ opentelemetry = "0.17"
arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
+bytesize = "1.2"
hex = "0.4"
tracing = "0.1"
rand = "0.8"
diff --git a/src/block/block.rs b/src/block/block.rs
index 935aa900..20f57aa5 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -1,3 +1,5 @@
+use std::path::PathBuf;
+
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -19,6 +21,14 @@ pub enum DataBlock {
Compressed(Bytes),
}
+#[derive(Debug)]
+pub enum DataBlockPath {
+ /// Uncompressed data fail
+ Plain(PathBuf),
+ /// Compressed data fail
+ Compressed(PathBuf),
+}
+
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
diff --git a/src/block/layout.rs b/src/block/layout.rs
new file mode 100644
index 00000000..e8339405
--- /dev/null
+++ b/src/block/layout.rs
@@ -0,0 +1,337 @@
+use std::path::PathBuf;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::config::DataDirEnum;
+use garage_util::data::Hash;
+use garage_util::error::{Error, OkOrMessage};
+use garage_util::migrate::*;
+
+type Idx = u16;
+
+const DRIVE_NPART: usize = 1024;
+
+const HASH_DRIVE_BYTES: (usize, usize) = (2, 3);
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) struct DataLayout {
+ pub(crate) data_dirs: Vec<DataDir>,
+
+ /// Primary storage location (index in data_dirs) for each partition
+ /// = the location where the data is supposed to be, blocks are always
+ /// written there (copies in other dirs may be deleted if they exist)
+ pub(crate) part_prim: Vec<Idx>,
+ /// Secondary storage locations for each partition = locations
+ /// where data blocks might be, we check from these dirs when reading
+ pub(crate) part_sec: Vec<Vec<Idx>>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
+pub(crate) struct DataDir {
+ pub(crate) path: PathBuf,
+ pub(crate) state: DataDirState,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
+pub(crate) enum DataDirState {
+ Active { capacity: u64 },
+ ReadOnly,
+}
+
+impl DataLayout {
+ pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> {
+ let data_dirs = make_data_dirs(dirs)?;
+
+ // Split partitions proportionnally to capacity for all drives
+ // to affect primary storage location
+ let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
+ assert!(total_cap > 0);
+
+ let mut part_prim = Vec::with_capacity(DRIVE_NPART);
+ let mut cum_cap = 0;
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if let DataDirState::Active { capacity } = dd.state {
+ cum_cap += capacity;
+ let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ part_prim.resize(n_total as usize, i as Idx);
+ }
+ }
+ assert_eq!(cum_cap, total_cap);
+ assert_eq!(part_prim.len(), DRIVE_NPART);
+
+ // If any of the storage locations is non-empty, it probably existed before
+ // this algorithm was added, so add it as a secondary storage location for all partitions
+ // to make sure existing files are not lost
+ let mut part_sec = vec![vec![]; DRIVE_NPART];
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if dir_not_empty(&dd.path)? {
+ for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
+ if *prim != i as Idx {
+ sec.push(i as Idx);
+ }
+ }
+ }
+ }
+
+ Ok(Self {
+ data_dirs,
+ part_prim,
+ part_sec,
+ })
+ }
+
+ pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> {
+ // Make list of new data directories, exit if nothing changed
+ let data_dirs = make_data_dirs(dirs)?;
+ if data_dirs == self.data_dirs {
+ return Ok(());
+ }
+
+ let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>();
+ assert!(total_cap > 0);
+
+ // Compute mapping of old indices to new indices
+ let old2new = self
+ .data_dirs
+ .iter()
+ .map(|x| {
+ data_dirs
+ .iter()
+ .position(|y| y.path == x.path)
+ .map(|x| x as Idx)
+ })
+ .collect::<Vec<_>>();
+
+ // Compute secondary location list for partitions based on existing
+ // folders, translating indices from old to new
+ let mut part_sec = self
+ .part_sec
+ .iter()
+ .map(|dl| {
+ dl.iter()
+ .filter_map(|old| old2new.get(*old as usize).copied().flatten())
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>();
+
+ // Compute a vector that, for each data dir,
+ // contains the list of partitions primarily stored on that drive
+ let mut dir_prim = vec![vec![]; data_dirs.len()];
+ for (ipart, prim) in self.part_prim.iter().enumerate() {
+ if let Some(new) = old2new.get(*prim as usize).copied().flatten() {
+ dir_prim[new as usize].push(ipart);
+ }
+ }
+
+ // Compute the target number of partitions per data directory
+ let mut cum_cap = 0;
+ let mut npart_per_dir = vec![0; data_dirs.len()];
+ for (idir, dd) in data_dirs.iter().enumerate() {
+ if let DataDirState::Active { capacity } = dd.state {
+ let begin = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ cum_cap += capacity;
+ let end = (cum_cap * DRIVE_NPART as u64) / total_cap;
+ npart_per_dir[idir] = (end - begin) as usize;
+ }
+ }
+ assert_eq!(cum_cap, total_cap);
+ assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART);
+
+ // For all directories that have too many primary partitions,
+ // move that partition to secondary
+ for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
+ {
+ while parts.len() > *tgt_npart {
+ let part = parts.pop().unwrap();
+ if !part_sec[part].contains(&(idir as Idx)) {
+ part_sec[part].push(idir as Idx);
+ }
+ }
+ }
+
+ // Calculate the vector of primary partition dir index
+ let mut part_prim = vec![None; DRIVE_NPART];
+ for (idir, parts) in dir_prim.iter().enumerate() {
+ for part in parts.iter() {
+ assert!(part_prim[*part].is_none());
+ part_prim[*part] = Some(idir as Idx)
+ }
+ }
+
+ // Calculate a vector of unassigned partitions
+ let mut unassigned = part_prim
+ .iter()
+ .enumerate()
+ .filter(|(_, dir)| dir.is_none())
+ .map(|(ipart, _)| ipart)
+ .collect::<Vec<_>>();
+
+ // For all directories that don't have enough primary partitions,
+ // add partitions from unassigned
+ for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate()
+ {
+ if parts.len() < *tgt_npart {
+ let required = *tgt_npart - parts.len();
+ assert!(unassigned.len() >= required);
+ for _ in 0..required {
+ let new_part = unassigned.pop().unwrap();
+ part_prim[new_part] = Some(idir as Idx);
+ part_sec[new_part].retain(|x| *x != idir as Idx);
+ }
+ }
+ }
+
+ // Sanity checks
+ assert!(part_prim.iter().all(|x| x.is_some()));
+ assert!(unassigned.is_empty());
+
+ // Transform part_prim from vec of Option<Idx> to vec of Idx
+ let part_prim = part_prim
+ .into_iter()
+ .map(|x| x.unwrap())
+ .collect::<Vec<_>>();
+ assert!(part_prim.iter().all(|p| data_dirs
+ .get(*p as usize)
+ .and_then(|x| x.capacity())
+ .unwrap_or(0)
+ > 0));
+
+ // If any of the newly added storage locations is non-empty,
+ // it might have been removed and added again and might contain data,
+ // so add it as a secondary storage location for all partitions
+ // to make sure existing files are not lost
+ for (i, dd) in data_dirs.iter().enumerate() {
+ if self.data_dirs.iter().any(|ed| ed.path == dd.path) {
+ continue;
+ }
+ if dir_not_empty(&dd.path)? {
+ for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) {
+ if *prim != i as Idx && !sec.contains(&(i as Idx)) {
+ sec.push(i as Idx);
+ }
+ }
+ }
+ }
+
+ // Apply newly generated config
+ *self = Self {
+ data_dirs,
+ part_prim,
+ part_sec,
+ };
+ Ok(())
+ }
+
+ pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf {
+ let ipart = self.partition_from(hash);
+ let idir = self.part_prim[ipart] as usize;
+ self.block_dir_from(hash, &self.data_dirs[idir].path)
+ }
+
+ pub(crate) fn secondary_block_dirs<'a>(
+ &'a self,
+ hash: &'a Hash,
+ ) -> impl Iterator<Item = PathBuf> + 'a {
+ let ipart = self.partition_from(hash);
+ self.part_sec[ipart]
+ .iter()
+ .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path))
+ }
+
+ fn partition_from(&self, hash: &Hash) -> usize {
+ u16::from_be_bytes([
+ hash.as_slice()[HASH_DRIVE_BYTES.0],
+ hash.as_slice()[HASH_DRIVE_BYTES.1],
+ ]) as usize % DRIVE_NPART
+ }
+
+ fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {
+ let mut path = dir.clone();
+ path.push(hex::encode(&hash.as_slice()[0..1]));
+ path.push(hex::encode(&hash.as_slice()[1..2]));
+ path
+ }
+
+ pub(crate) fn without_secondary_locations(&self) -> Self {
+ Self {
+ data_dirs: self.data_dirs.clone(),
+ part_prim: self.part_prim.clone(),
+ part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(),
+ }
+ }
+}
+
+impl InitialFormat for DataLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09bmdl";
+}
+
+impl DataDir {
+ pub fn capacity(&self) -> Option<u64> {
+ match self.state {
+ DataDirState::Active { capacity } => Some(capacity),
+ _ => None,
+ }
+ }
+}
+
+fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> {
+ let mut data_dirs = vec![];
+ match dirs {
+ DataDirEnum::Single(path) => data_dirs.push(DataDir {
+ path: path.clone(),
+ state: DataDirState::Active {
+ capacity: 1_000_000_000, // whatever, doesn't matter
+ },
+ }),
+ DataDirEnum::Multiple(dirs) => {
+ let mut ok = false;
+ for dir in dirs.iter() {
+ let state = match &dir.capacity {
+ Some(cap) if dir.read_only == false => {
+ let capacity = cap.parse::<bytesize::ByteSize>()
+ .ok_or_message("invalid capacity value")?.as_u64();
+ if capacity == 0 {
+ return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy())));
+ }
+ ok = true;
+ DataDirState::Active {
+ capacity,
+ }
+ }
+ None if dir.read_only == true => {
+ DataDirState::ReadOnly
+ }
+ _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))),
+ };
+ data_dirs.push(DataDir {
+ path: dir.path.clone(),
+ state,
+ });
+ }
+ if !ok {
+ return Err(Error::Message(
+ "incorrect data_dir configuration, no primary writable directory specified"
+ .into(),
+ ));
+ }
+ }
+ }
+ Ok(data_dirs)
+}
+
+fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> {
+ for entry in std::fs::read_dir(&path)? {
+ let dir = entry?;
+ if dir.file_type()?.is_dir()
+ && dir
+ .file_name()
+ .into_string()
+ .ok()
+ .and_then(|hex| hex::decode(&hex).ok())
+ .is_some()
+ {
+ return Ok(true);
+ }
+ }
+ Ok(false)
+}
diff --git a/src/block/lib.rs b/src/block/lib.rs
index d2814f77..c9ff2845 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -6,5 +6,6 @@ pub mod repair;
pub mod resync;
mod block;
+mod layout;
mod metrics;
mod rc;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index c7e4cd03..2d1b5c67 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
+use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
@@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
+use garage_util::config::DataDirEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::PersisterShared;
+use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
@@ -38,6 +39,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
+use crate::layout::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
@@ -77,13 +79,16 @@ impl Rpc for BlockRpc {
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
- /// Directory in which block are stored
- pub data_dir: PathBuf,
+
+ /// Data layout
+ pub(crate) data_layout: ArcSwap<DataLayout>,
+ /// Data layout persister
+ pub(crate) data_layout_persister: Persister<DataLayout>,
data_fsync: bool,
compression_level: Option<i32>,
- mutation_lock: [Mutex<BlockManagerLocked>; 256],
+ mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@@ -106,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64,
}
+// The number of different mutexes used to parallelize write access to data blocks
+const MUTEX_COUNT: usize = 256;
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -114,12 +122,29 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: PathBuf,
+ data_dir: DataDirEnum,
data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
- ) -> Arc<Self> {
+ ) -> Result<Arc<Self>, Error> {
+ // Load or compute layout, i.e. assignment of data blocks to the different data directories
+ let data_layout_persister: Persister<DataLayout> =
+ Persister::new(&system.metadata_dir, "data_layout");
+ let data_layout = match data_layout_persister.load() {
+ Ok(mut layout) => {
+ layout
+ .update(&data_dir)
+ .ok_or_message("invalid data_dir config")?;
+ layout
+ }
+ Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
+ };
+ data_layout_persister
+ .save(&data_layout)
+ .expect("cannot save data_layout");
+
+ // Open metadata tables
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -142,10 +167,14 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
- data_dir,
+ data_layout: ArcSwap::new(Arc::new(data_layout)),
+ data_layout_persister,
data_fsync,
compression_level,
- mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
+ mutation_lock: vec![(); MUTEX_COUNT]
+ .iter()
+ .map(|_| Mutex::new(BlockManagerLocked()))
+ .collect::<Vec<_>>(),
rc,
resync,
system,
@@ -157,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap();
- block_manager
+ Ok(block_manager)
}
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@@ -204,44 +233,10 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
-
- for node in who.iter() {
- let node_id = NodeID::from(*node);
- let rpc = self.endpoint.call_streaming(
- &node_id,
- BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
- );
- tokio::select! {
- res = rpc => {
- let res = match res {
- Ok(res) => res,
- Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
- continue;
- }
- };
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
- continue;
- }
- };
- return Ok((header, stream));
- }
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
- }
- };
- }
-
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ Ok((header, stream))
+ })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -251,6 +246,24 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ read_stream_to_end(stream)
+ .await
+ .map(|data| DataBlock::from_parts(header, data))
+ })
+ .await
+ }
+
+ async fn rpc_get_raw_block_internal<F, Fut, T>(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ f: F,
+ ) -> Result<T, Error>
+ where
+ F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ Fut: futures::Future<Output = Result<T, Error>>,
+ {
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
@@ -266,34 +279,41 @@ impl BlockManager {
let res = match res {
Ok(res) => res,
Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
+ debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
+ (Ok(_), _) => {
+ debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
+ continue;
+ }
+ (Err(e), _) => {
+ debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
continue;
}
};
- match read_stream_to_end(stream).await {
- Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ match f(header, stream).await {
+ Ok(ret) => return Ok(ret),
Err(e) => {
- debug!("Error reading stream from node {:?}: {}", node, e);
+ debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
}
}
}
+ // TODO: sleep less long (fail early), initiate a second request earlier
+ // if the first one doesn't succeed rapidly
+ // TODO: keep first request running when initiating a new one and take the
+ // one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
+ debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ let msg = format!("Get block {:?}: no node returned a valid block", hash);
+ debug!("{}", msg);
+ Err(Error::Message(msg))
}
// ---- Public interface ----
@@ -471,8 +491,6 @@ impl BlockManager {
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
- let write_size = data.inner_buffer().len() as u64;
-
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
@@ -482,8 +500,6 @@ impl BlockManager {
))
.await?;
- self.metrics.bytes_written.add(write_size);
-
Ok(())
}
@@ -510,36 +526,42 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let data = self
- .read_block_internal(hash)
- .bound_record_duration(&self.metrics.block_read_duration)
- .await?;
-
- self.metrics
- .bytes_read
- .add(data.inner_buffer().len() as u64);
-
- Ok(data)
+ let tracer = opentelemetry::global::tracer("garage");
+ async {
+ match self.find_block(hash).await {
+ Some(p) => self.read_block_from(hash, &p).await,
+ None => {
+ // Not found but maybe we should have had it ??
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ return Err(Error::Message(format!(
+ "block {:?} not found on node",
+ hash
+ )));
+ }
+ }
+ }
+ .bound_record_duration(&self.metrics.block_read_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManager::read_block"),
+ ))
+ .await
}
- async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let mut path = self.block_path(hash);
- let compressed = match self.is_block_compressed(hash).await {
- Ok(c) => c,
- Err(e) => {
- // Not found but maybe we should have had it ??
- self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
- return Err(Into::into(e));
- }
+ pub(crate) async fn read_block_from(
+ &self,
+ hash: &Hash,
+ block_path: &DataBlockPath,
+ ) -> Result<DataBlock, Error> {
+ let (path, compressed) = match block_path {
+ DataBlockPath::Plain(p) => (p, false),
+ DataBlockPath::Compressed(p) => (p, true),
};
- if compressed {
- path.set_extension("zst");
- }
- let mut f = fs::File::open(&path).await?;
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
@@ -551,29 +573,27 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
self.lock_mutate(hash)
.await
- .move_block_to_corrupted(hash, self)
+ .move_block_to_corrupted(block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
+
return Err(Error::CorruptData(*hash));
}
Ok(data)
}
- /// Check if this node has a block and whether it needs it
- pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.lock_mutate(hash)
- .await
- .check_block_status(hash, self)
- .await
- }
-
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
- Ok(needed.is_nonzero() && !exists)
+ let rc = self.rc.get_block_rc(hash)?;
+ let exists = self.find_block(hash).await.is_some();
+ Ok(rc.is_nonzero() && !exists)
}
/// Delete block if it is not needed anymore
@@ -584,59 +604,65 @@ impl BlockManager {
.await
}
- /// Utility: gives the path of the directory in which a block should be found
- fn block_dir(&self, hash: &Hash) -> PathBuf {
- let mut path = self.data_dir.clone();
- path.push(hex::encode(&hash.as_slice()[0..1]));
- path.push(hex::encode(&hash.as_slice()[1..2]));
- path
- }
-
- /// Utility: give the full path where a block should be found, minus extension if block is
- /// compressed
- fn block_path(&self, hash: &Hash) -> PathBuf {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash.as_ref()));
- path
- }
+ /// Find the path where a block is currently stored
+ pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
+ let data_layout = self.data_layout.load_full();
+ let dirs = Some(data_layout.primary_block_dir(hash))
+ .into_iter()
+ .chain(data_layout.secondary_block_dirs(hash));
+ let filename = hex::encode(hash.as_ref());
- /// Utility: check if block is stored compressed. Error if block is not stored
- async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
- let mut path = self.block_path(hash);
+ for dir in dirs {
+ let mut path = dir;
+ path.push(&filename);
- // If compression is disabled on node - check for the raw block
- // first and then a compressed one (as compression may have been
- // previously enabled).
- match self.compression_level {
- None => {
+ if self.compression_level.is_none() {
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
if fs::metadata(&path).await.is_ok() {
- return Ok(false);
+ return Some(DataBlockPath::Plain(path));
}
-
path.set_extension("zst");
-
- fs::metadata(&path).await.map(|_| true).map_err(Into::into)
- }
- _ => {
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Compressed(path));
+ }
+ } else {
path.set_extension("zst");
-
if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+ return Some(DataBlockPath::Compressed(path));
}
-
path.set_extension("");
-
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Plain(path));
+ }
}
}
+
+ None
+ }
+
+ /// Rewrite a block at the primary location for its path and delete the old path.
+ /// Returns the number of bytes read/written
+ pub(crate) async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ ) -> Result<usize, Error> {
+ self.lock_mutate(hash)
+ .await
+ .fix_block_location(hash, wrong_path, self)
+ .await
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
- self.mutation_lock[hash.as_slice()[0] as usize]
+ let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
+ % self.mutation_lock.len();
+ self.mutation_lock[ilock]
.lock()
.with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
+ tracer.start(format!("Acquire mutation_lock #{}", ilock)),
))
.await
}
@@ -649,7 +675,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, header } => Resp::new(
self.handle_put_block(*hash, *header, message.take_stream())
.await
- .map(|_| BlockRpc::Ok),
+ .map(|()| BlockRpc::Ok),
),
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
@@ -660,62 +686,70 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
}
}
-pub(crate) struct BlockStatus {
- pub(crate) exists: bool,
- pub(crate) needed: RcEntry,
-}
-
impl BlockManagerLocked {
- async fn check_block_status(
+ async fn write_block(
&self,
hash: &Hash,
+ data: &DataBlock,
mgr: &BlockManager,
- ) -> Result<BlockStatus, Error> {
- let exists = mgr.is_block_compressed(hash).await.is_ok();
- let needed = mgr.rc.get_block_rc(hash)?;
-
- Ok(BlockStatus { exists, needed })
+ ) -> Result<(), Error> {
+ let existing_path = mgr.find_block(hash).await;
+ self.write_block_inner(hash, data, mgr, existing_path).await
}
- async fn write_block(
+ async fn write_block_inner(
&self,
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
+ existing_path: Option<DataBlockPath>,
) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
- let mut path = mgr.block_dir(hash);
- let directory = path.clone();
- path.push(hex::encode(hash));
+ let directory = mgr.data_layout.load().primary_block_dir(hash);
- fs::create_dir_all(&directory).await?;
+ let mut tgt_path = directory.clone();
+ tgt_path.push(hex::encode(hash));
+ if compressed {
+ tgt_path.set_extension("zst");
+ }
- let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(()),
- (Ok(false), false) => return Ok(()),
- (Ok(false), true) => {
- let path_to_delete = path.clone();
- path.set_extension("zst");
- Some(path_to_delete)
- }
- (Err(_), compressed) => {
- if compressed {
- path.set_extension("zst");
- }
- None
- }
+ let to_delete = match (existing_path, compressed) {
+ // If the block is stored in the wrong directory,
+ // write it again at the correct path and delete the old path
+ (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
+ (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
+
+ // If the block is already stored not compressed but we have a compressed
+ // copy, write the compressed copy and delete the uncompressed one
+ (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
+
+ // If the block is already stored compressed,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
+
+ // If the block is already stored not compressed,
+ // and we don't have a compressed copy either,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
+
+ // If the block isn't stored already, just store what is given to us
+ (None, _) => None,
};
+ assert!(to_delete.as_ref() != Some(&tgt_path));
- let mut path_tmp = path.clone();
+ let mut path_tmp = tgt_path.clone();
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
path_tmp.set_extension(tmp_extension);
+ fs::create_dir_all(&directory).await?;
+
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
+ mgr.metrics.bytes_written.add(data.len() as u64);
if mgr.data_fsync {
f.sync_all().await?;
@@ -723,7 +757,7 @@ impl BlockManagerLocked {
drop(f);
- fs::rename(path_tmp, path).await?;
+ fs::rename(path_tmp, tgt_path).await?;
delete_on_drop.cancel();
@@ -749,36 +783,49 @@ impl BlockManagerLocked {
Ok(())
}
- async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- warn!(
- "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
- hash
- );
- let mut path = mgr.block_path(hash);
- let mut path2 = path.clone();
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
- path2.set_extension("zst.corrupted");
- } else {
- path2.set_extension("corrupted");
- }
+ async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
+ let (path, path2) = match block_path {
+ DataBlockPath::Plain(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("corrupted");
+ (p, p2)
+ }
+ DataBlockPath::Compressed(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("zst.corrupted");
+ (p, p2)
+ }
+ };
+
fs::rename(path, path2).await?;
Ok(())
}
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
-
- if exists && needed.is_deletable() {
- let mut path = mgr.block_path(hash);
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
+ let rc = mgr.rc.get_block_rc(hash)?;
+ if rc.is_deletable() {
+ while let Some(path) = mgr.find_block(hash).await {
+ let path = match path {
+ DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
+ };
+ fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
- fs::remove_file(path).await?;
- mgr.metrics.delete_counter.add(1);
}
Ok(())
}
+
+ async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ mgr: &BlockManager,
+ ) -> Result<usize, Error> {
+ let data = mgr.read_block_from(hash, &wrong_path).await?;
+ self.write_block_inner(hash, &data, mgr, Some(wrong_path))
+ .await?;
+ Ok(data.inner_buffer().len())
+ }
}
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 71093d69..77ee0d14 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use crate::block::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
- if let Some(hash) = bi.next().await? {
+ if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -175,7 +176,9 @@ mod v081 {
}
mod v082 {
+ use garage_util::data::Hash;
use serde::{Deserialize, Serialize};
+ use std::path::PathBuf;
use super::v081;
@@ -185,6 +188,27 @@ mod v082 {
pub(crate) time_last_complete_scrub: u64,
pub(crate) time_next_run_scrub: u64,
pub(crate) corruptions_detected: u64,
+ #[serde(default)]
+ pub(crate) checkpoint: Option<BlockStoreIterator>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub struct BlockStoreIterator {
+ pub todo: Vec<BsiTodo>,
+ }
+
+ #[derive(Serialize, Deserialize, Clone)]
+ pub enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ hash: Hash,
+ progress: u64,
+ },
}
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
@@ -199,6 +223,7 @@ mod v082 {
time_last_complete_scrub: old.time_last_complete_scrub,
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
corruptions_detected: old.corruptions_detected,
+ checkpoint: None,
}
}
}
@@ -235,14 +260,23 @@ impl Default for ScrubWorkerPersisted {
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
+ checkpoint: None,
}
}
}
#[derive(Default)]
enum ScrubWorkerState {
- Running(BlockStoreIterator),
- Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Running {
+ iterator: BlockStoreIterator,
+ // time of the last checkpoint
+ t_cp: u64,
+ },
+ Paused {
+ iterator: BlockStoreIterator,
+ // time at which the scrub should be resumed
+ t_resume: u64,
+ },
#[default]
Finished,
}
@@ -261,10 +295,17 @@ impl ScrubWorker {
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
+ let work = match persister.get_with(|x| x.checkpoint.clone()) {
+ None => ScrubWorkerState::Finished,
+ Some(iterator) => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
+ };
Self {
manager,
rx_cmd,
- work: ScrubWorkerState::Finished,
+ work,
tranquilizer: Tranquilizer::new(30),
persister,
}
@@ -277,7 +318,16 @@ impl ScrubWorker {
ScrubWorkerState::Finished => {
info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
- ScrubWorkerState::Running(iterator)
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ }
}
work => {
error!("Cannot start scrub worker: already running!");
@@ -287,8 +337,18 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
- ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ ScrubWorkerState::Running { iterator, .. }
+ | ScrubWorkerState::Paused { iterator, .. } => {
+ if let Err(e) = self
+ .persister
+ .set_with(|x| x.checkpoint = Some(iterator.clone()))
+ {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
+ ScrubWorkerState::Paused {
+ iterator,
+ t_resume: now_msec() + dur.as_millis() as u64,
+ }
}
work => {
error!("Cannot pause scrub worker: not running!");
@@ -298,7 +358,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ ScrubWorkerState::Paused { iterator, .. } => ScrubWorkerState::Running {
+ iterator,
+ t_cp: now_msec(),
+ },
work => {
error!("Cannot resume scrub worker: not paused!");
work
@@ -307,7 +370,10 @@ impl ScrubWorker {
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
- ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Running { .. } | ScrubWorkerState::Paused { .. } => {
+ if let Err(e) = self.persister.set_with(|x| x.checkpoint = None) {
+ error!("Could not save scrub checkpoint: {}", e);
+ }
ScrubWorkerState::Finished
}
work => {
@@ -343,12 +409,15 @@ impl Worker for ScrubWorker {
..Default::default()
};
match &self.work {
- ScrubWorkerState::Running(bsi) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ ScrubWorkerState::Running { iterator, .. } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
}
- ScrubWorkerState::Paused(bsi, rt) => {
- s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
- s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
+ ScrubWorkerState::Paused { iterator, t_resume } => {
+ s.progress = Some(format!("{:.2}%", iterator.progress() * 100.));
+ s.freeform = vec![format!(
+ "Scrub paused, resumes at {}",
+ msec_to_rfc3339(*t_resume)
+ )];
}
ScrubWorkerState::Finished => {
s.freeform = vec![
@@ -374,9 +443,11 @@ impl Worker for ScrubWorker {
};
match &mut self.work {
- ScrubWorkerState::Running(bsi) => {
+ ScrubWorkerState::Running { iterator, t_cp } => {
self.tranquilizer.reset();
- if let Some(hash) = bsi.next().await? {
+ let now = now_msec();
+
+ if let Some((_path, hash)) = iterator.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -385,16 +456,23 @@ impl Worker for ScrubWorker {
Err(e) => return Err(e),
_ => (),
};
+
+ if now - *t_cp > 60 * 1000 {
+ self.persister
+ .set_with(|p| p.checkpoint = Some(iterator.clone()))?;
+ *t_cp = now;
+ }
+
Ok(self
.tranquilizer
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- let now = now_msec();
let next_scrub_timestamp = randomize_next_scrub_run_time(now);
self.persister.set_with(|p| {
p.time_last_complete_scrub = now;
p.time_next_run_scrub = next_scrub_timestamp;
+ p.checkpoint = None;
})?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
@@ -413,8 +491,8 @@ impl Worker for ScrubWorker {
async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
- ScrubWorkerState::Running(_) => return WorkerState::Busy,
- ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
+ ScrubWorkerState::Running { .. } => return WorkerState::Busy,
+ ScrubWorkerState::Paused { t_resume, .. } => (*t_resume, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
@@ -437,110 +515,250 @@ impl Worker for ScrubWorker {
}
match &self.work {
- ScrubWorkerState::Running(_) => WorkerState::Busy,
+ ScrubWorkerState::Running { .. } => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
}
// ---- ---- ----
-// UTILITY FOR ENUMERATING THE BLOCK STORE
+// THIRD KIND OF REPAIR: REBALANCING DATA BLOCKS
+// between multiple storage locations.
+// This is a one-shot repair operation that can be launched,
+// checks everything, and then exits.
// ---- ---- ----
-struct BlockStoreIterator {
- path: Vec<ReadingDir>,
+pub struct RebalanceWorker {
+ manager: Arc<BlockManager>,
+ block_iter: BlockStoreIterator,
+ t_started: u64,
+ t_finished: Option<u64>,
+ moved: usize,
+ moved_bytes: u64,
}
-enum ReadingDir {
- Pending(PathBuf),
- Read {
- subpaths: Vec<fs::DirEntry>,
- pos: usize,
- },
+impl RebalanceWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ let block_iter = BlockStoreIterator::new(&manager);
+ Self {
+ manager,
+ block_iter,
+ t_started: now_msec(),
+ t_finished: None,
+ moved: 0,
+ moved_bytes: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RebalanceWorker {
+ fn name(&self) -> String {
+ "Block rebalance worker".into()
+ }
+
+ fn status(&self) -> WorkerStatus {
+ let t_cur = self.t_finished.unwrap_or_else(|| now_msec());
+ let rate = self.moved_bytes / std::cmp::max(1, (t_cur - self.t_started) / 1000);
+ let mut freeform = vec![
+ format!("Blocks moved: {}", self.moved),
+ format!(
+ "Bytes moved: {} ({}/s)",
+ bytesize::ByteSize::b(self.moved_bytes),
+ bytesize::ByteSize::b(rate)
+ ),
+ format!("Started: {}", msec_to_rfc3339(self.t_started)),
+ ];
+ if let Some(t_fin) = self.t_finished {
+ freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
+ }
+ WorkerStatus {
+ progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
+ freeform,
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some((path, hash)) = self.block_iter.next().await? {
+ let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
+ if path.ancestors().all(|x| x != prim_loc) {
+ let block_path = match path.extension() {
+ None => DataBlockPath::Plain(path.clone()),
+ Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
+ _ => {
+ warn!("not rebalancing file: {}", path.to_string_lossy());
+ return Ok(WorkerState::Busy);
+ }
+ };
+ // block is not in its primary location,
+ // move it there (reading and re-writing does the trick)
+ debug!("rebalance: moving block {:?} => {:?}", block_path, prim_loc);
+ let block_len = self.manager.fix_block_location(&hash, block_path).await?;
+ self.moved += 1;
+ self.moved_bytes += block_len as u64;
+ }
+ Ok(WorkerState::Busy)
+ } else {
+ // all blocks are in their primary location:
+ // - the ones we moved now are
+ // - the ones written in the meantime always were, because we only
+ // write to primary locations
+ // so we can safely remove all secondary locations from the data layout
+ let new_layout = self
+ .manager
+ .data_layout
+ .load_full()
+ .without_secondary_locations();
+ self.manager
+ .data_layout_persister
+ .save_async(&new_layout)
+ .await?;
+ self.manager.data_layout.store(Arc::new(new_layout));
+ self.t_finished = Some(now_msec());
+ Ok(WorkerState::Done)
+ }
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
}
+// ---- ---- ----
+// UTILITY FOR ENUMERATING THE BLOCK STORE
+// ---- ---- ----
+
+const PROGRESS_FP: u64 = 1_000_000_000;
+
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let root_dir = manager.data_dir.clone();
- Self {
- path: vec![ReadingDir::Pending(root_dir)],
+ let data_layout = manager.data_layout.load_full();
+
+ let mut dir_cap = vec![0; data_layout.data_dirs.len()];
+ for prim in data_layout.part_prim.iter() {
+ dir_cap[*prim as usize] += 1;
+ }
+ for sec_vec in data_layout.part_sec.iter() {
+ for sec in sec_vec.iter() {
+ dir_cap[*sec as usize] += 1;
+ }
}
+ let sum_cap = dir_cap.iter().sum::<usize>() as u64;
+
+ let mut cum_cap = 0;
+ let mut todo = vec![];
+ for (dir, cap) in data_layout.data_dirs.iter().zip(dir_cap.into_iter()) {
+ let progress_min = (cum_cap * PROGRESS_FP) / sum_cap;
+ let progress_max = ((cum_cap + cap as u64) * PROGRESS_FP) / sum_cap;
+ cum_cap += cap as u64;
+
+ todo.push(BsiTodo::Directory {
+ path: dir.path.clone(),
+ progress_min,
+ progress_max,
+ });
+ }
+ // entries are processed back-to-front (because of .pop()),
+ // so reverse entries to process them in increasing progress bounds
+ todo.reverse();
+
+ let ret = Self { todo };
+ debug_assert!(ret.progress_invariant());
+
+ ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
- if self.path.is_empty() {
- 1.0
- } else {
- let mut ret = 0.0;
- let mut next_div = 1;
- for p in self.path.iter() {
- match p {
- ReadingDir::Pending(_) => break,
- ReadingDir::Read { subpaths, pos } => {
- next_div *= subpaths.len();
- ret += ((*pos - 1) as f32) / (next_div as f32);
- }
- }
- }
- ret
- }
+ self.todo
+ .last()
+ .map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => *progress_min,
+ BsiTodo::File { progress, .. } => *progress,
+ })
+ .map(|x| x as f32 / PROGRESS_FP as f32)
+ .unwrap_or(1.0)
}
- async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
- let last_path = match self.path.last_mut() {
+ match self.todo.pop() {
None => return Ok(None),
- Some(lp) => lp,
- };
-
- if let ReadingDir::Pending(path) = last_path {
- let mut reader = fs::read_dir(&path).await?;
- let mut subpaths = vec![];
- while let Some(ent) = reader.next_entry().await? {
- subpaths.push(ent);
- }
- *last_path = ReadingDir::Read { subpaths, pos: 0 };
- }
+ Some(BsiTodo::Directory {
+ path,
+ progress_min,
+ progress_max,
+ }) => {
+ let istart = self.todo.len();
+
+ let mut reader = fs::read_dir(&path).await?;
+ while let Some(ent) = reader.next_entry().await? {
+ let name = if let Ok(n) = ent.file_name().into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ft = ent.file_type().await?;
+ if ft.is_dir() && hex::decode(&name).is_ok() {
+ self.todo.push(BsiTodo::Directory {
+ path: ent.path(),
+ progress_min: 0,
+ progress_max: 0,
+ });
+ } else if ft.is_file() {
+ let filename = name.split_once('.').map(|(f, _)| f).unwrap_or(&name);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ hash: hash.into(),
+ progress: 0,
+ });
+ }
+ }
+ }
+ }
- let (subpaths, pos) = match *last_path {
- ReadingDir::Read {
- ref subpaths,
- ref mut pos,
- } => (subpaths, pos),
- ReadingDir::Pending(_) => unreachable!(),
- };
-
- let data_dir_ent = match subpaths.get(*pos) {
- None => {
- self.path.pop();
- continue;
- }
- Some(ent) => {
- *pos += 1;
- ent
+ let count = self.todo.len() - istart;
+ for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
+ let p1 = progress_min
+ + ((progress_max - progress_min) * i as u64) / count as u64;
+ let p2 = progress_min
+ + ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
+ match ent {
+ BsiTodo::Directory {
+ progress_min,
+ progress_max,
+ ..
+ } => {
+ *progress_min = p1;
+ *progress_max = p2;
+ }
+ BsiTodo::File { progress, .. } => {
+ *progress = p1;
+ }
+ }
+ }
+ self.todo[istart..].reverse();
+ debug_assert!(self.progress_invariant());
}
- };
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
- let path = data_dir_ent.path();
- self.path.push(ReadingDir::Pending(path));
- } else if name.len() == 64 {
- if let Ok(h) = hex::decode(name) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some(hash.into()));
+ Some(BsiTodo::File { path, hash, .. }) => {
+ return Ok(Some((path, hash)));
}
}
}
}
+
+ // for debug_assert!
+ fn progress_invariant(&self) -> bool {
+ let iter = self.todo.iter().map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => progress_min,
+ BsiTodo::File { progress, .. } => progress,
+ });
+ let iter_1 = iter.clone().skip(1);
+ iter.zip(iter_1).all(|(prev, next)| prev >= next)
+ }
}
diff --git a/src/block/resync.rs b/src/block/resync.rs
index ea280ad4..9c1da4a7 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// No more than 4 resync workers can be running in the system
-pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
+pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
// Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
@@ -359,20 +359,23 @@ impl BlockResyncManager {
}
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
- let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
+ let existing_path = manager.find_block(hash).await;
+ let exists = existing_path.is_some();
+ let rc = manager.rc.get_block_rc(hash)?;
- if exists != needed.is_needed() || exists != needed.is_nonzero() {
+ if exists != rc.is_needed() || exists != rc.is_nonzero() {
debug!(
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
hash,
exists,
- needed.is_nonzero(),
- needed.is_deletable(),
+ rc.is_nonzero(),
+ rc.is_deletable(),
);
}
- if exists && needed.is_deletable() {
+ if exists && rc.is_deletable() {
info!("Resync block {:?}: offloading and deleting", hash);
+ let existing_path = existing_path.unwrap();
let mut who = manager.replication.write_nodes(hash);
if who.len() < manager.replication.write_quorum() {
@@ -419,7 +422,7 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let block = manager.read_block(hash).await?;
+ let block = manager.read_block_from(hash, &existing_path).await?;
let (header, bytes) = block.into_parts();
let put_block_message = Req::new(BlockRpc::PutBlock {
hash: *hash,
@@ -451,7 +454,7 @@ impl BlockResyncManager {
manager.rc.clear_deleted_block_rc(hash)?;
}
- if needed.is_nonzero() && !exists {
+ if rc.is_nonzero() && !exists {
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 9ca4a059..fd37a24e 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -471,6 +471,9 @@ pub enum RepairWhat {
#[structopt(subcommand)]
cmd: ScrubCmd,
},
+ /// Rebalance data blocks among storage locations
+ #[structopt(name = "rebalance", version = garage_version())]
+ Rebalance,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index abfaf9f9..9e4de873 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -70,6 +70,12 @@ pub async fn launch_online_repair(
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
+ RepairWhat::Rebalance => {
+ info!("Rebalancing the stored blocks among storage locations");
+ bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
}
Ok(())
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 981430fb..721d5e3a 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -92,8 +92,22 @@ impl Garage {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
- std::fs::create_dir_all(&config.data_dir)
- .ok_or_message("Unable to create Garage data directory")?;
+ match &config.data_dir {
+ DataDirEnum::Single(data_dir) => {
+ std::fs::create_dir_all(data_dir).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ data_dir.to_string_lossy()
+ ))?;
+ }
+ DataDirEnum::Multiple(data_dirs) => {
+ for dir in data_dirs {
+ std::fs::create_dir_all(&dir.path).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ dir.path.to_string_lossy()
+ ))?;
+ }
+ }
+ }
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
@@ -237,7 +251,7 @@ impl Garage {
config.compression_level,
data_rep_param,
system.clone(),
- );
+ )?;
block_manager.register_bg_vars(&mut bg_vars);
// ---- admin tables ----
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1675e70e..cf480549 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
+use garage_util::config::{Config, DataDirEnum};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
@@ -119,7 +119,7 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -890,7 +890,12 @@ impl NodeStatus {
}
}
- fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ fn update_disk_usage(
+ &mut self,
+ meta_dir: &Path,
+ data_dir: &DataDirEnum,
+ metrics: &SystemMetrics,
+ ) {
use systemstat::{Platform, System};
let mounts = System::new().mounts().unwrap_or_default();
@@ -903,7 +908,35 @@ impl NodeStatus {
};
self.meta_disk_avail = mount_avail(meta_dir);
- self.data_disk_avail = mount_avail(data_dir);
+ self.data_disk_avail = match data_dir {
+ DataDirEnum::Single(dir) => mount_avail(dir),
+ DataDirEnum::Multiple(dirs) => {
+ // Take mounts corresponding to all specified data directories that
+ // can be used for writing data
+ let mounts = dirs
+ .iter()
+ .filter(|dir| dir.capacity.is_some())
+ .map(|dir| {
+ mounts
+ .iter()
+ .filter(|mnt| dir.path.starts_with(&mnt.fs_mounted_on))
+ .max_by_key(|mnt| mnt.fs_mounted_on.len())
+ })
+ .collect::<Vec<_>>();
+ if mounts.iter().any(|x| x.is_none()) {
+ None // could not get info for at least one mount
+ } else {
+ // dedup mounts in case several data directories are on the same filesystem
+ let mut mounts = mounts.iter().map(|x| x.unwrap()).collect::<Vec<_>>();
+ mounts.sort_by(|x, y| x.fs_mounted_on.cmp(&y.fs_mounted_on));
+ mounts.dedup_by(|x, y| x.fs_mounted_on == y.fs_mounted_on);
+ // calculate sum of available and total space
+ Some(mounts.iter().fold((0, 0), |(x, y), mnt| {
+ (x + mnt.avail.as_u64(), y + mnt.total.as_u64())
+ }))
+ }
+ }
+ };
if let Some((avail, total)) = self.meta_disk_avail {
metrics
diff --git a/src/table/queue.rs b/src/table/queue.rs
index 0857209b..096ac8b4 100644
--- a/src/table/queue.rs
+++ b/src/table/queue.rs
@@ -12,7 +12,7 @@ use crate::replication::*;
use crate::schema::*;
use crate::table::*;
-const BATCH_SIZE: usize = 100;
+const BATCH_SIZE: usize = 1024;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where
diff --git a/src/util/config.rs b/src/util/config.rs
index eeb17e0e..9d00fe82 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -13,7 +13,7 @@ pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
/// Whether to fsync after all metadata transactions (disabled by default)
#[serde(default)]
@@ -94,6 +94,26 @@ pub struct Config {
pub admin: AdminConfig,
}
+/// Value for data_dir: either a single directory or a list of dirs with attributes
+#[derive(Deserialize, Debug, Clone)]
+#[serde(untagged)]
+pub enum DataDirEnum {
+ Single(PathBuf),
+ Multiple(Vec<DataDir>),
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct DataDir {
+ /// Path to the data directory
+ pub path: PathBuf,
+ /// Capacity of the drive (required if read_only is false)
+ #[serde(default)]
+ pub capacity: Option<String>,
+ /// Whether this is a legacy read-only path (capacity should be None)
+ #[serde(default)]
+ pub read_only: bool,
+}
+
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {