diff options
Diffstat (limited to 'src/block/layout.rs')
-rw-r--r-- | src/block/layout.rs | 337 |
1 files changed, 337 insertions, 0 deletions
diff --git a/src/block/layout.rs b/src/block/layout.rs new file mode 100644 index 00000000..e8339405 --- /dev/null +++ b/src/block/layout.rs @@ -0,0 +1,337 @@ +use std::path::PathBuf; + +use serde::{Deserialize, Serialize}; + +use garage_util::config::DataDirEnum; +use garage_util::data::Hash; +use garage_util::error::{Error, OkOrMessage}; +use garage_util::migrate::*; + +type Idx = u16; + +const DRIVE_NPART: usize = 1024; + +const HASH_DRIVE_BYTES: (usize, usize) = (2, 3); + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct DataLayout { + pub(crate) data_dirs: Vec<DataDir>, + + /// Primary storage location (index in data_dirs) for each partition + /// = the location where the data is supposed to be, blocks are always + /// written there (copies in other dirs may be deleted if they exist) + pub(crate) part_prim: Vec<Idx>, + /// Secondary storage locations for each partition = locations + /// where data blocks might be, we check from these dirs when reading + pub(crate) part_sec: Vec<Vec<Idx>>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +pub(crate) struct DataDir { + pub(crate) path: PathBuf, + pub(crate) state: DataDirState, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum DataDirState { + Active { capacity: u64 }, + ReadOnly, +} + +impl DataLayout { + pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> { + let data_dirs = make_data_dirs(dirs)?; + + // Split partitions proportionnally to capacity for all drives + // to affect primary storage location + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>(); + assert!(total_cap > 0); + + let mut part_prim = Vec::with_capacity(DRIVE_NPART); + let mut cum_cap = 0; + for (i, dd) in data_dirs.iter().enumerate() { + if let DataDirState::Active { capacity } = dd.state { + cum_cap += capacity; + let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap; + part_prim.resize(n_total as usize, i as Idx); + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(part_prim.len(), DRIVE_NPART); + + // If any of the storage locations is non-empty, it probably existed before + // this algorithm was added, so add it as a secondary storage location for all partitions + // to make sure existing files are not lost + let mut part_sec = vec![vec![]; DRIVE_NPART]; + for (i, dd) in data_dirs.iter().enumerate() { + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx { + sec.push(i as Idx); + } + } + } + } + + Ok(Self { + data_dirs, + part_prim, + part_sec, + }) + } + + pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> { + // Make list of new data directories, exit if nothing changed + let data_dirs = make_data_dirs(dirs)?; + if data_dirs == self.data_dirs { + return Ok(()); + } + + let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>(); + assert!(total_cap > 0); + + // Compute mapping of old indices to new indices + let old2new = self + .data_dirs + .iter() + .map(|x| { + data_dirs + .iter() + .position(|y| y.path == x.path) + .map(|x| x as Idx) + }) + .collect::<Vec<_>>(); + + // Compute secondary location list for partitions based on existing + // folders, translating indices from old to new + let mut part_sec = self + .part_sec + .iter() + .map(|dl| { + dl.iter() + .filter_map(|old| old2new.get(*old as usize).copied().flatten()) + .collect::<Vec<_>>() + }) + .collect::<Vec<_>>(); + + // Compute a vector that, for each data dir, + // contains the list of partitions primarily stored on that drive + let mut dir_prim = vec![vec![]; data_dirs.len()]; + for (ipart, prim) in self.part_prim.iter().enumerate() { + if let Some(new) = old2new.get(*prim as usize).copied().flatten() { + dir_prim[new as usize].push(ipart); + } + } + + // Compute the target number of partitions per data directory + let mut cum_cap = 0; + let mut npart_per_dir = vec![0; data_dirs.len()]; + for (idir, dd) in data_dirs.iter().enumerate() { + if let DataDirState::Active { capacity } = dd.state { + let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; + cum_cap += capacity; + let end = (cum_cap * DRIVE_NPART as u64) / total_cap; + npart_per_dir[idir] = (end - begin) as usize; + } + } + assert_eq!(cum_cap, total_cap); + assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART); + + // For all directories that have too many primary partitions, + // move that partition to secondary + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + while parts.len() > *tgt_npart { + let part = parts.pop().unwrap(); + if !part_sec[part].contains(&(idir as Idx)) { + part_sec[part].push(idir as Idx); + } + } + } + + // Calculate the vector of primary partition dir index + let mut part_prim = vec![None; DRIVE_NPART]; + for (idir, parts) in dir_prim.iter().enumerate() { + for part in parts.iter() { + assert!(part_prim[*part].is_none()); + part_prim[*part] = Some(idir as Idx) + } + } + + // Calculate a vector of unassigned partitions + let mut unassigned = part_prim + .iter() + .enumerate() + .filter(|(_, dir)| dir.is_none()) + .map(|(ipart, _)| ipart) + .collect::<Vec<_>>(); + + // For all directories that don't have enough primary partitions, + // add partitions from unassigned + for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() + { + if parts.len() < *tgt_npart { + let required = *tgt_npart - parts.len(); + assert!(unassigned.len() >= required); + for _ in 0..required { + let new_part = unassigned.pop().unwrap(); + part_prim[new_part] = Some(idir as Idx); + part_sec[new_part].retain(|x| *x != idir as Idx); + } + } + } + + // Sanity checks + assert!(part_prim.iter().all(|x| x.is_some())); + assert!(unassigned.is_empty()); + + // Transform part_prim from vec of Option<Idx> to vec of Idx + let part_prim = part_prim + .into_iter() + .map(|x| x.unwrap()) + .collect::<Vec<_>>(); + assert!(part_prim.iter().all(|p| data_dirs + .get(*p as usize) + .and_then(|x| x.capacity()) + .unwrap_or(0) + > 0)); + + // If any of the newly added storage locations is non-empty, + // it might have been removed and added again and might contain data, + // so add it as a secondary storage location for all partitions + // to make sure existing files are not lost + for (i, dd) in data_dirs.iter().enumerate() { + if self.data_dirs.iter().any(|ed| ed.path == dd.path) { + continue; + } + if dir_not_empty(&dd.path)? { + for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { + if *prim != i as Idx && !sec.contains(&(i as Idx)) { + sec.push(i as Idx); + } + } + } + } + + // Apply newly generated config + *self = Self { + data_dirs, + part_prim, + part_sec, + }; + Ok(()) + } + + pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.block_dir_from(hash, &self.data_dirs[idir].path) + } + + pub(crate) fn secondary_block_dirs<'a>( + &'a self, + hash: &'a Hash, + ) -> impl Iterator<Item = PathBuf> + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path)) + } + + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[HASH_DRIVE_BYTES.0], + hash.as_slice()[HASH_DRIVE_BYTES.1], + ]) as usize % DRIVE_NPART + } + + fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + let mut path = dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + } + + pub(crate) fn without_secondary_locations(&self) -> Self { + Self { + data_dirs: self.data_dirs.clone(), + part_prim: self.part_prim.clone(), + part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(), + } + } +} + +impl InitialFormat for DataLayout { + const VERSION_MARKER: &'static [u8] = b"G09bmdl"; +} + +impl DataDir { + pub fn capacity(&self) -> Option<u64> { + match self.state { + DataDirState::Active { capacity } => Some(capacity), + _ => None, + } + } +} + +fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> { + let mut data_dirs = vec![]; + match dirs { + DataDirEnum::Single(path) => data_dirs.push(DataDir { + path: path.clone(), + state: DataDirState::Active { + capacity: 1_000_000_000, // whatever, doesn't matter + }, + }), + DataDirEnum::Multiple(dirs) => { + let mut ok = false; + for dir in dirs.iter() { + let state = match &dir.capacity { + Some(cap) if dir.read_only == false => { + let capacity = cap.parse::<bytesize::ByteSize>() + .ok_or_message("invalid capacity value")?.as_u64(); + if capacity == 0 { + return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); + } + ok = true; + DataDirState::Active { + capacity, + } + } + None if dir.read_only == true => { + DataDirState::ReadOnly + } + _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), + }; + data_dirs.push(DataDir { + path: dir.path.clone(), + state, + }); + } + if !ok { + return Err(Error::Message( + "incorrect data_dir configuration, no primary writable directory specified" + .into(), + )); + } + } + } + Ok(data_dirs) +} + +fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> { + for entry in std::fs::read_dir(&path)? { + let dir = entry?; + if dir.file_type()?.is_dir() + && dir + .file_name() + .into_string() + .ok() + .and_then(|hex| hex::decode(&hex).ok()) + .is_some() + { + return Ok(true); + } + } + Ok(false) +} |