aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-09-04 14:49:49 +0200
committerAlex Auvolat <alex@adnab.me>2023-09-06 16:35:28 +0200
commit71c0188055e25aa1c00d0226f0ca99ce323310a6 (patch)
treee02f35033a601a502672e6c0294e5cc06ff3b563
parent4b4f2000f45a83b4dad3f2a8fd8392a245a30286 (diff)
downloadgarage-71c0188055e25aa1c00d0226f0ca99ce323310a6.tar.gz
garage-71c0188055e25aa1c00d0226f0ca99ce323310a6.zip
block manager: skeleton for multi-hdd support
-rw-r--r--src/block/layout.rs57
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs29
-rw-r--r--src/block/repair.rs217
-rw-r--r--src/model/garage.rs18
-rw-r--r--src/rpc/system.rs23
-rw-r--r--src/util/config.rs22
7 files changed, 277 insertions, 90 deletions
diff --git a/src/block/layout.rs b/src/block/layout.rs
new file mode 100644
index 00000000..cbc326d8
--- /dev/null
+++ b/src/block/layout.rs
@@ -0,0 +1,57 @@
+use std::path::PathBuf;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::config::DataDirEnum;
+use garage_util::data::Hash;
+use garage_util::migrate::*;
+
+pub const DRIVE_NPART: usize = 1024;
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) struct DataLayout {
+ pub(crate) data_dirs: Vec<DataDir>,
+ pub(crate) partitions: Vec<Partition>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) struct DataDir {
+ pub(crate) path: PathBuf,
+ pub(crate) state: DataDirState,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) enum DataDirState {
+ Active { capacity: u64 },
+ ReadOnly,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub(crate) struct Partition {
+ pub(crate) prim: usize,
+ pub(crate) sec: Vec<usize>,
+}
+
+impl DataLayout {
+ pub(crate) fn initialize(dirs: &DataDirEnum) -> Self {
+ todo!()
+ }
+
+ pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Self {
+ todo!()
+ }
+
+ pub(crate) fn data_dir(&self, hash: &Hash) -> PathBuf {
+ todo!()
+ /*
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(&hash.as_slice()[0..1]));
+ path.push(hex::encode(&hash.as_slice()[1..2]));
+ path
+ */
+ }
+}
+
+impl InitialFormat for DataLayout {
+ const VERSION_MARKER: &'static [u8] = b"G09bmdl";
+}
diff --git a/src/block/lib.rs b/src/block/lib.rs
index d2814f77..c9ff2845 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -6,5 +6,6 @@ pub mod repair;
pub mod resync;
mod block;
+mod layout;
mod metrics;
mod rc;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index c7e4cd03..18a2686e 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
+use garage_util::config::DataDirEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::PersisterShared;
+use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
@@ -38,6 +39,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
+use crate::layout::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
@@ -77,8 +79,11 @@ impl Rpc for BlockRpc {
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
- /// Directory in which block are stored
- pub data_dir: PathBuf,
+
+ /// Directory/ies in which block are stored
+ pub data_dir: DataDirEnum,
+ /// Data layout
+ pub(crate) data_layout: DataLayout,
data_fsync: bool,
compression_level: Option<i32>,
@@ -114,12 +119,22 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: PathBuf,
+ data_dir: DataDirEnum,
data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
+ let layout_persister: Persister<DataLayout> =
+ Persister::new(&system.metadata_dir, "data_layout");
+ let data_layout = match layout_persister.load() {
+ Ok(mut layout) => {
+ layout.update(&data_dir);
+ layout
+ }
+ Err(_) => DataLayout::initialize(&data_dir),
+ };
+
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -143,6 +158,7 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
data_dir,
+ data_layout,
data_fsync,
compression_level,
mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
@@ -586,10 +602,7 @@ impl BlockManager {
/// Utility: gives the path of the directory in which a block should be found
fn block_dir(&self, hash: &Hash) -> PathBuf {
- let mut path = self.data_dir.clone();
- path.push(hex::encode(&hash.as_slice()[0..1]));
- path.push(hex::encode(&hash.as_slice()[1..2]));
- path
+ self.data_layout.data_dir(hash)
}
/// Utility: give the full path where a block should be found, minus extension if block is
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 71093d69..d5e2e168 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -17,6 +17,7 @@ use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use crate::layout::*;
use crate::manager::*;
// Full scrub every 25 days with a random element of 10 days mixed in below
@@ -136,7 +137,7 @@ impl Worker for RepairWorker {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
- if let Some(hash) = bi.next().await? {
+ if let Some((_path, hash)) = bi.next().await? {
self.manager
.resync
.put_to_resync(&hash, Duration::from_secs(0))?;
@@ -376,7 +377,7 @@ impl Worker for ScrubWorker {
match &mut self.work {
ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset();
- if let Some(hash) = bsi.next().await? {
+ if let Some((_path, hash)) = bsi.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
@@ -447,100 +448,166 @@ impl Worker for ScrubWorker {
// UTILITY FOR ENUMERATING THE BLOCK STORE
// ---- ---- ----
+const PROGRESS_FP: u64 = 1_000_000_000;
+
struct BlockStoreIterator {
- path: Vec<ReadingDir>,
+ todo: Vec<BsiTodo>,
}
-enum ReadingDir {
- Pending(PathBuf),
- Read {
- subpaths: Vec<fs::DirEntry>,
- pos: usize,
+enum BsiTodo {
+ Directory {
+ path: PathBuf,
+ progress_min: u64,
+ progress_max: u64,
+ },
+ File {
+ path: PathBuf,
+ filename: String,
+ progress: u64,
},
}
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
- let root_dir = manager.data_dir.clone();
- Self {
- path: vec![ReadingDir::Pending(root_dir)],
+ let min_cap = manager
+ .data_layout
+ .data_dirs
+ .iter()
+ .filter_map(|x| match x.state {
+ DataDirState::Active { capacity } => Some(capacity),
+ _ => None,
+ })
+ .min()
+ .unwrap_or(0);
+
+ let sum_cap = manager
+ .data_layout
+ .data_dirs
+ .iter()
+ .map(|x| match x.state {
+ DataDirState::Active { capacity } => capacity,
+ _ => min_cap, // approximation
+ })
+ .sum::<u64>() as u128;
+
+ let mut cum_cap = 0;
+ let mut todo = vec![];
+ for dir in manager.data_layout.data_dirs.iter() {
+ let cap = match dir.state {
+ DataDirState::Active { capacity } => capacity,
+ _ => min_cap,
+ };
+
+ let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64;
+ let progress_max =
+ (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64;
+ cum_cap += cap;
+
+ todo.push(BsiTodo::Directory {
+ path: dir.path.clone(),
+ progress_min,
+ progress_max,
+ });
}
+ // entries are processed back-to-front (because of .pop()),
+ // so reverse entries to process them in increasing progress bounds
+ todo.reverse();
+
+ let ret = Self { todo };
+ debug_assert!(ret.progress_invariant());
+
+ ret
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
- if self.path.is_empty() {
- 1.0
- } else {
- let mut ret = 0.0;
- let mut next_div = 1;
- for p in self.path.iter() {
- match p {
- ReadingDir::Pending(_) => break,
- ReadingDir::Read { subpaths, pos } => {
- next_div *= subpaths.len();
- ret += ((*pos - 1) as f32) / (next_div as f32);
- }
- }
- }
- ret
- }
+ self.todo
+ .last()
+ .map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => *progress_min,
+ BsiTodo::File { progress, .. } => *progress,
+ })
+ .map(|x| x as f32 / PROGRESS_FP as f32)
+ .unwrap_or(1.0)
}
- async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ async fn next(&mut self) -> Result<Option<(PathBuf, Hash)>, Error> {
loop {
- let last_path = match self.path.last_mut() {
+ match self.todo.pop() {
None => return Ok(None),
- Some(lp) => lp,
- };
-
- if let ReadingDir::Pending(path) = last_path {
- let mut reader = fs::read_dir(&path).await?;
- let mut subpaths = vec![];
- while let Some(ent) = reader.next_entry().await? {
- subpaths.push(ent);
- }
- *last_path = ReadingDir::Read { subpaths, pos: 0 };
- }
-
- let (subpaths, pos) = match *last_path {
- ReadingDir::Read {
- ref subpaths,
- ref mut pos,
- } => (subpaths, pos),
- ReadingDir::Pending(_) => unreachable!(),
- };
+ Some(BsiTodo::Directory {
+ path,
+ progress_min,
+ progress_max,
+ }) => {
+ let istart = self.todo.len();
+
+ let mut reader = fs::read_dir(&path).await?;
+ while let Some(ent) = reader.next_entry().await? {
+ let name = if let Ok(n) = ent.file_name().into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ft = ent.file_type().await?;
+ if ft.is_dir() && hex::decode(&name).is_ok() {
+ self.todo.push(BsiTodo::Directory {
+ path: ent.path(),
+ progress_min: 0,
+ progress_max: 0,
+ });
+ } else if ft.is_file() {
+ self.todo.push(BsiTodo::File {
+ path: ent.path(),
+ filename: name,
+ progress: 0,
+ });
+ }
+ }
- let data_dir_ent = match subpaths.get(*pos) {
- None => {
- self.path.pop();
- continue;
- }
- Some(ent) => {
- *pos += 1;
- ent
+ let count = self.todo.len() - istart;
+ for (i, ent) in self.todo[istart..].iter_mut().enumerate() {
+ let p1 = progress_min
+ + ((progress_max - progress_min) * i as u64) / count as u64;
+ let p2 = progress_min
+ + ((progress_max - progress_min) * (i + 1) as u64) / count as u64;
+ match ent {
+ BsiTodo::Directory {
+ progress_min,
+ progress_max,
+ ..
+ } => {
+ *progress_min = p1;
+ *progress_max = p2;
+ }
+ BsiTodo::File { progress, .. } => {
+ *progress = p1;
+ }
+ }
+ }
+ self.todo[istart..].reverse();
+ debug_assert!(self.progress_invariant());
}
- };
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
- let path = data_dir_ent.path();
- self.path.push(ReadingDir::Pending(path));
- } else if name.len() == 64 {
- if let Ok(h) = hex::decode(name) {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&h);
- return Ok(Some(hash.into()));
+ Some(BsiTodo::File { path, filename, .. }) => {
+ let filename = filename.strip_suffix(".zst").unwrap_or(&filename);
+ if filename.len() == 64 {
+ if let Ok(h) = hex::decode(filename) {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&h);
+ return Ok(Some((path, hash.into())));
+ }
+ }
}
}
}
}
+
+ fn progress_invariant(&self) -> bool {
+ let iter = self.todo.iter().map(|x| match x {
+ BsiTodo::Directory { progress_min, .. } => progress_min,
+ BsiTodo::File { progress, .. } => progress,
+ });
+ let iter_1 = iter.clone().skip(1);
+ iter.zip(iter_1).all(|(prev, next)| prev >= next)
+ }
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 981430fb..d6eebfb0 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -92,8 +92,22 @@ impl Garage {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
- std::fs::create_dir_all(&config.data_dir)
- .ok_or_message("Unable to create Garage data directory")?;
+ match &config.data_dir {
+ DataDirEnum::Single(data_dir) => {
+ std::fs::create_dir_all(data_dir).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ data_dir.to_string_lossy()
+ ))?;
+ }
+ DataDirEnum::Multiple(data_dirs) => {
+ for dir in data_dirs {
+ std::fs::create_dir_all(&dir.path).ok_or_message(format!(
+ "Unable to create Garage data directory: {}",
+ dir.path.to_string_lossy()
+ ))?;
+ }
+ }
+ }
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1675e70e..c5751d5d 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -22,9 +22,9 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
+use garage_util::config::{Config, DataDirEnum};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
@@ -119,7 +119,7 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -890,7 +890,12 @@ impl NodeStatus {
}
}
- fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ fn update_disk_usage(
+ &mut self,
+ meta_dir: &Path,
+ data_dir: &DataDirEnum,
+ metrics: &SystemMetrics,
+ ) {
use systemstat::{Platform, System};
let mounts = System::new().mounts().unwrap_or_default();
@@ -903,7 +908,17 @@ impl NodeStatus {
};
self.meta_disk_avail = mount_avail(meta_dir);
- self.data_disk_avail = mount_avail(data_dir);
+ self.data_disk_avail = match data_dir {
+ DataDirEnum::Single(dir) => mount_avail(dir),
+ DataDirEnum::Multiple(dirs) => {
+ dirs.iter()
+ .map(|d| mount_avail(&d.path))
+ .fold(Some((0, 0)), |acc, cur| match (acc, cur) {
+ (Some((x, y)), Some((a, b))) => Some((x + a, y + b)),
+ _ => None,
+ })
+ }
+ };
if let Some((avail, total)) = self.meta_disk_avail {
metrics
diff --git a/src/util/config.rs b/src/util/config.rs
index eeb17e0e..9d00fe82 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -13,7 +13,7 @@ pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
/// Whether to fsync after all metadata transactions (disabled by default)
#[serde(default)]
@@ -94,6 +94,26 @@ pub struct Config {
pub admin: AdminConfig,
}
+/// Value for data_dir: either a single directory or a list of dirs with attributes
+#[derive(Deserialize, Debug, Clone)]
+#[serde(untagged)]
+pub enum DataDirEnum {
+ Single(PathBuf),
+ Multiple(Vec<DataDir>),
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct DataDir {
+ /// Path to the data directory
+ pub path: PathBuf,
+ /// Capacity of the drive (required if read_only is false)
+ #[serde(default)]
+ pub capacity: Option<String>,
+ /// Whether this is a legacy read-only path (capacity should be None)
+ #[serde(default)]
+ pub read_only: bool,
+}
+
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {