use std::path::PathBuf; use serde::{Deserialize, Serialize}; use garage_util::config::DataDirEnum; use garage_util::data::Hash; use garage_util::error::{Error, OkOrMessage}; use garage_util::migrate::*; type Idx = u16; const DRIVE_NPART: usize = 1024; const HASH_DRIVE_BYTES: (usize, usize) = (2, 3); #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct DataLayout { pub(crate) data_dirs: Vec<DataDir>, /// Primary storage location (index in data_dirs) for each partition /// = the location where the data is supposed to be, blocks are always /// written there (copies in other dirs may be deleted if they exist) pub(crate) part_prim: Vec<Idx>, /// Secondary storage locations for each partition = locations /// where data blocks might be, we check from these dirs when reading pub(crate) part_sec: Vec<Vec<Idx>>, } #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub(crate) struct DataDir { pub(crate) path: PathBuf, pub(crate) state: DataDirState, } #[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum DataDirState { Active { capacity: u64 }, ReadOnly, } impl DataLayout { pub(crate) fn initialize(dirs: &DataDirEnum) -> Result<Self, Error> { let data_dirs = make_data_dirs(dirs)?; // Split partitions proportionnally to capacity for all drives // to affect primary storage location let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>(); assert!(total_cap > 0); let mut part_prim = Vec::with_capacity(DRIVE_NPART); let mut cum_cap = 0; for (i, dd) in data_dirs.iter().enumerate() { if let DataDirState::Active { capacity } = dd.state { cum_cap += capacity; let n_total = (cum_cap * DRIVE_NPART as u64) / total_cap; part_prim.resize(n_total as usize, i as Idx); } } assert_eq!(cum_cap, total_cap); assert_eq!(part_prim.len(), DRIVE_NPART); // If any of the storage locations is non-empty, it probably existed before // this algorithm was added, so add it as a secondary storage location for all partitions // to make sure existing files are not lost let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if dir_not_empty(&dd.path)? { for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { if *prim != i as Idx { sec.push(i as Idx); } } } } Ok(Self { data_dirs, part_prim, part_sec, }) } pub(crate) fn update(&mut self, dirs: &DataDirEnum) -> Result<(), Error> { // Make list of new data directories, exit if nothing changed let data_dirs = make_data_dirs(dirs)?; if data_dirs == self.data_dirs { return Ok(()); } let total_cap = data_dirs.iter().filter_map(|x| x.capacity()).sum::<u64>(); assert!(total_cap > 0); // Compute mapping of old indices to new indices let old2new = self .data_dirs .iter() .map(|x| { data_dirs .iter() .position(|y| y.path == x.path) .map(|x| x as Idx) }) .collect::<Vec<_>>(); // Compute secondary location list for partitions based on existing // folders, translating indices from old to new let mut part_sec = self .part_sec .iter() .map(|dl| { dl.iter() .filter_map(|old| old2new.get(*old as usize).copied().flatten()) .collect::<Vec<_>>() }) .collect::<Vec<_>>(); // Compute a vector that, for each data dir, // contains the list of partitions primarily stored on that drive let mut dir_prim = vec![vec![]; data_dirs.len()]; for (ipart, prim) in self.part_prim.iter().enumerate() { if let Some(new) = old2new.get(*prim as usize).copied().flatten() { dir_prim[new as usize].push(ipart); } } // Compute the target number of partitions per data directory let mut cum_cap = 0; let mut npart_per_dir = vec![0; data_dirs.len()]; for (idir, dd) in data_dirs.iter().enumerate() { if let DataDirState::Active { capacity } = dd.state { let begin = (cum_cap * DRIVE_NPART as u64) / total_cap; cum_cap += capacity; let end = (cum_cap * DRIVE_NPART as u64) / total_cap; npart_per_dir[idir] = (end - begin) as usize; } } assert_eq!(cum_cap, total_cap); assert_eq!(npart_per_dir.iter().sum::<usize>(), DRIVE_NPART); // For all directories that have too many primary partitions, // move that partition to secondary for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() { while parts.len() > *tgt_npart { let part = parts.pop().unwrap(); if !part_sec[part].contains(&(idir as Idx)) { part_sec[part].push(idir as Idx); } } } // Calculate the vector of primary partition dir index let mut part_prim = vec![None; DRIVE_NPART]; for (idir, parts) in dir_prim.iter().enumerate() { for part in parts.iter() { assert!(part_prim[*part].is_none()); part_prim[*part] = Some(idir as Idx) } } // Calculate a vector of unassigned partitions let mut unassigned = part_prim .iter() .enumerate() .filter(|(_, dir)| dir.is_none()) .map(|(ipart, _)| ipart) .collect::<Vec<_>>(); // For all directories that don't have enough primary partitions, // add partitions from unassigned for (idir, (parts, tgt_npart)) in dir_prim.iter_mut().zip(npart_per_dir.iter()).enumerate() { if parts.len() < *tgt_npart { let required = *tgt_npart - parts.len(); assert!(unassigned.len() >= required); for _ in 0..required { let new_part = unassigned.pop().unwrap(); part_prim[new_part] = Some(idir as Idx); part_sec[new_part].retain(|x| *x != idir as Idx); } } } // Sanity checks assert!(part_prim.iter().all(|x| x.is_some())); assert!(unassigned.is_empty()); // Transform part_prim from vec of Option<Idx> to vec of Idx let part_prim = part_prim .into_iter() .map(|x| x.unwrap()) .collect::<Vec<_>>(); assert!(part_prim.iter().all(|p| data_dirs .get(*p as usize) .and_then(|x| x.capacity()) .unwrap_or(0) > 0)); // If any of the newly added storage locations is non-empty, // it might have been removed and added again and might contain data, // so add it as a secondary storage location for all partitions // to make sure existing files are not lost let mut part_sec = vec![vec![]; DRIVE_NPART]; for (i, dd) in data_dirs.iter().enumerate() { if self.data_dirs.iter().any(|ed| ed.path == dd.path) { continue; } if dir_not_empty(&dd.path)? { for (sec, prim) in part_sec.iter_mut().zip(part_prim.iter()) { if *prim != i as Idx && !sec.contains(&(i as Idx)) { sec.push(i as Idx); } } } } // Apply newly generated config *self = Self { data_dirs, part_prim, part_sec, }; Ok(()) } pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { let ipart = self.partition_from(hash); let idir = self.part_prim[ipart] as usize; self.block_dir_from(hash, &self.data_dirs[idir].path) } pub(crate) fn secondary_block_dirs<'a>( &'a self, hash: &'a Hash, ) -> impl Iterator<Item = PathBuf> + 'a { let ipart = self.partition_from(hash); self.part_sec[ipart] .iter() .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path)) } fn partition_from(&self, hash: &Hash) -> usize { u16::from_be_bytes([ hash.as_slice()[HASH_DRIVE_BYTES.0], hash.as_slice()[HASH_DRIVE_BYTES.1], ]) as usize % DRIVE_NPART } fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path } pub(crate) fn without_secondary_locations(&self) -> Self { Self { data_dirs: self.data_dirs.clone(), part_prim: self.part_prim.clone(), part_sec: self.part_sec.iter().map(|_| vec![]).collect::<Vec<_>>(), } } } impl InitialFormat for DataLayout { const VERSION_MARKER: &'static [u8] = b"G09bmdl"; } impl DataDir { pub fn capacity(&self) -> Option<u64> { match self.state { DataDirState::Active { capacity } => Some(capacity), _ => None, } } } fn make_data_dirs(dirs: &DataDirEnum) -> Result<Vec<DataDir>, Error> { let mut data_dirs = vec![]; match dirs { DataDirEnum::Single(path) => data_dirs.push(DataDir { path: path.clone(), state: DataDirState::Active { capacity: 1_000_000_000, // whatever, doesn't matter }, }), DataDirEnum::Multiple(dirs) => { let mut ok = false; for dir in dirs.iter() { let state = match &dir.capacity { Some(cap) if dir.read_only == false => { let capacity = cap.parse::<bytesize::ByteSize>() .ok_or_message("invalid capacity value")?.as_u64(); if capacity == 0 { return Err(Error::Message(format!("data directory {} should have non-zero capacity", dir.path.to_string_lossy()))); } ok = true; DataDirState::Active { capacity, } } None if dir.read_only == true => { DataDirState::ReadOnly } _ => return Err(Error::Message(format!("data directories in data_dir should have a capacity value or be marked read_only, not the case for {}", dir.path.to_string_lossy()))), }; data_dirs.push(DataDir { path: dir.path.clone(), state, }); } if !ok { return Err(Error::Message( "incorrect data_dir configuration, no primary writable directory specified" .into(), )); } } } Ok(data_dirs) } fn dir_not_empty(path: &PathBuf) -> Result<bool, Error> { for entry in std::fs::read_dir(&path)? { let dir = entry?; if dir.file_type()?.is_dir() && dir .file_name() .into_string() .ok() .and_then(|hex| hex::decode(&hex).ok()) .is_some() { return Ok(true); } } Ok(false) }