diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 458 |
1 files changed, 257 insertions, 201 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 3ece9a8a..2d1b5c67 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use arc_swap::ArcSwapOption; +use arc_swap::{ArcSwap, ArcSwapOption}; use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; @@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; use garage_util::background::{vars, BackgroundRunner}; +use garage_util::config::DataDirEnum; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::PersisterShared; +use garage_util::persister::{Persister, PersisterShared}; use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; @@ -38,6 +39,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; +use crate::layout::*; use crate::metrics::*; use crate::rc::*; use crate::repair::*; @@ -77,12 +79,16 @@ impl Rpc for BlockRpc { pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, - /// Directory in which block are stored - pub data_dir: PathBuf, + /// Data layout + pub(crate) data_layout: ArcSwap<DataLayout>, + /// Data layout persister + pub(crate) data_layout_persister: Persister<DataLayout>, + + data_fsync: bool, compression_level: Option<i32>, - mutation_lock: [Mutex<BlockManagerLocked>; 256], + mutation_lock: Vec<Mutex<BlockManagerLocked>>, pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -105,6 +111,9 @@ pub struct BlockResyncErrorInfo { pub next_try: u64, } +// The number of different mutexes used to parallelize write access to data blocks +const MUTEX_COUNT: usize = 256; + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -113,11 +122,29 @@ struct BlockManagerLocked(); impl BlockManager { pub fn new( db: &db::Db, - data_dir: PathBuf, + data_dir: DataDirEnum, + data_fsync: bool, compression_level: Option<i32>, replication: TableShardedReplication, system: Arc<System>, - ) -> Arc<Self> { + ) -> Result<Arc<Self>, Error> { + // Load or compute layout, i.e. assignment of data blocks to the different data directories + let data_layout_persister: Persister<DataLayout> = + Persister::new(&system.metadata_dir, "data_layout"); + let data_layout = match data_layout_persister.load() { + Ok(mut layout) => { + layout + .update(&data_dir) + .ok_or_message("invalid data_dir config")?; + layout + } + Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, + }; + data_layout_persister + .save(&data_layout) + .expect("cannot save data_layout"); + + // Open metadata tables let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -140,9 +167,14 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, - data_dir, + data_layout: ArcSwap::new(Arc::new(data_layout)), + data_layout_persister, + data_fsync, compression_level, - mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), + mutation_lock: vec![(); MUTEX_COUNT] + .iter() + .map(|_| Mutex::new(BlockManagerLocked())) + .collect::<Vec<_>>(), rc, resync, system, @@ -154,7 +186,7 @@ impl BlockManager { block_manager.endpoint.set_handler(block_manager.clone()); block_manager.scrub_persister.set_with(|_| ()).unwrap(); - block_manager + Ok(block_manager) } pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { @@ -201,44 +233,10 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<(DataBlockHeader, ByteStream), Error> { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); - - for node in who.iter() { - let node_id = NodeID::from(*node); - let rpc = self.endpoint.call_streaming( - &node_id, - BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, - ); - tokio::select! { - res = rpc => { - let res = match res { - Ok(res) => res, - Err(e) => { - debug!("Node {:?} returned error: {}", node, e); - continue; - } - }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { - debug!("Node {:?} returned a malformed response", node); - continue; - } - }; - return Ok((header, stream)); - } - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); - } - }; - } - - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + Ok((header, stream)) + }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -248,6 +246,24 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + read_stream_to_end(stream) + .await + .map(|data| DataBlock::from_parts(header, data)) + }) + .await + } + + async fn rpc_get_raw_block_internal<F, Fut, T>( + &self, + hash: &Hash, + order_tag: Option<OrderTag>, + f: F, + ) -> Result<T, Error> + where + F: Fn(DataBlockHeader, ByteStream) -> Fut, + Fut: futures::Future<Output = Result<T, Error>>, + { let who = self.replication.read_nodes(hash); let who = self.system.rpc.request_order(&who); @@ -263,34 +279,41 @@ impl BlockManager { let res = match res { Ok(res) => res, Err(e) => { - debug!("Node {:?} returned error: {}", node, e); + debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e); continue; } }; let (header, stream) = match res.into_parts() { (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { - debug!("Node {:?} returned a malformed response", node); + (Ok(_), _) => { + debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); + continue; + } + (Err(e), _) => { + debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e); continue; } }; - match read_stream_to_end(stream).await { - Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + match f(header, stream).await { + Ok(ret) => return Ok(ret), Err(e) => { - debug!("Error reading stream from node {:?}: {}", node, e); + debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); } } } + // TODO: sleep less long (fail early), initiate a second request earlier + // if the first one doesn't succeed rapidly + // TODO: keep first request running when initiating a new one and take the + // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); + debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + let msg = format!("Get block {:?}: no node returned a valid block", hash); + debug!("{}", msg); + Err(Error::Message(msg)) } // ---- Public interface ---- @@ -468,8 +491,6 @@ impl BlockManager { pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); - let write_size = data.inner_buffer().len() as u64; - self.lock_mutate(hash) .await .write_block(hash, data, self) @@ -479,8 +500,6 @@ impl BlockManager { )) .await?; - self.metrics.bytes_written.add(write_size); - Ok(()) } @@ -507,36 +526,42 @@ impl BlockManager { /// Read block from disk, verifying it's integrity pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; - - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); - - Ok(data) + let tracer = opentelemetry::global::tracer("garage"); + async { + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, + None => { + // Not found but maybe we should have had it ?? + self.resync + .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); + } + } + } + .bound_record_duration(&self.metrics.block_read_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManager::read_block"), + )) + .await } - async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> { - let mut path = self.block_path(hash); - let compressed = match self.is_block_compressed(hash).await { - Ok(c) => c, - Err(e) => { - // Not found but maybe we should have had it ?? - self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Into::into(e)); - } + pub(crate) async fn read_block_from( + &self, + hash: &Hash, + block_path: &DataBlockPath, + ) -> Result<DataBlock, Error> { + let (path, compressed) = match block_path { + DataBlockPath::Plain(p) => (p, false), + DataBlockPath::Compressed(p) => (p, true), }; - if compressed { - path.set_extension("zst"); - } - let mut f = fs::File::open(&path).await?; + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; + self.metrics.bytes_read.add(data.len() as u64); drop(f); let data = if compressed { @@ -548,29 +573,27 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); self.lock_mutate(hash) .await - .move_block_to_corrupted(hash, self) + .move_block_to_corrupted(block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; + return Err(Error::CorruptData(*hash)); } Ok(data) } - /// Check if this node has a block and whether it needs it - pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> { - self.lock_mutate(hash) - .await - .check_block_status(hash, self) - .await - } - /// Check if this node should have a block, but don't actually have it async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { - let BlockStatus { exists, needed } = self.check_block_status(hash).await?; - Ok(needed.is_nonzero() && !exists) + let rc = self.rc.get_block_rc(hash)?; + let exists = self.find_block(hash).await.is_some(); + Ok(rc.is_nonzero() && !exists) } /// Delete block if it is not needed anymore @@ -581,59 +604,65 @@ impl BlockManager { .await } - /// Utility: gives the path of the directory in which a block should be found - fn block_dir(&self, hash: &Hash) -> PathBuf { - let mut path = self.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path - } - - /// Utility: give the full path where a block should be found, minus extension if block is - /// compressed - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } + /// Find the path where a block is currently stored + pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> { + let data_layout = self.data_layout.load_full(); + let dirs = Some(data_layout.primary_block_dir(hash)) + .into_iter() + .chain(data_layout.secondary_block_dirs(hash)); + let filename = hex::encode(hash.as_ref()); - /// Utility: check if block is stored compressed. Error if block is not stored - async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> { - let mut path = self.block_path(hash); + for dir in dirs { + let mut path = dir; + path.push(&filename); - // If compression is disabled on node - check for the raw block - // first and then a compressed one (as compression may have been - // previously enabled). - match self.compression_level { - None => { + if self.compression_level.is_none() { + // If compression is disabled on node - check for the raw block + // first and then a compressed one (as compression may have been + // previously enabled). if fs::metadata(&path).await.is_ok() { - return Ok(false); + return Some(DataBlockPath::Plain(path)); } - path.set_extension("zst"); - - fs::metadata(&path).await.map(|_| true).map_err(Into::into) - } - _ => { + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Compressed(path)); + } + } else { path.set_extension("zst"); - if fs::metadata(&path).await.is_ok() { - return Ok(true); + return Some(DataBlockPath::Compressed(path)); } - path.set_extension(""); - - fs::metadata(&path).await.map(|_| false).map_err(Into::into) + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Plain(path)); + } } } + + None + } + + /// Rewrite a block at the primary location for its path and delete the old path. + /// Returns the number of bytes read/written + pub(crate) async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + ) -> Result<usize, Error> { + self.lock_mutate(hash) + .await + .fix_block_location(hash, wrong_path, self) + .await } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); - self.mutation_lock[hash.as_slice()[0] as usize] + let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize + % self.mutation_lock.len(); + self.mutation_lock[ilock] .lock() .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), + tracer.start(format!("Acquire mutation_lock #{}", ilock)), )) .await } @@ -646,7 +675,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager { BlockRpc::PutBlock { hash, header } => Resp::new( self.handle_put_block(*hash, *header, message.take_stream()) .await - .map(|_| BlockRpc::Ok), + .map(|()| BlockRpc::Ok), ), BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { @@ -657,66 +686,78 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager { } } -pub(crate) struct BlockStatus { - pub(crate) exists: bool, - pub(crate) needed: RcEntry, -} - impl BlockManagerLocked { - async fn check_block_status( + async fn write_block( &self, hash: &Hash, + data: &DataBlock, mgr: &BlockManager, - ) -> Result<BlockStatus, Error> { - let exists = mgr.is_block_compressed(hash).await.is_ok(); - let needed = mgr.rc.get_block_rc(hash)?; - - Ok(BlockStatus { exists, needed }) + ) -> Result<(), Error> { + let existing_path = mgr.find_block(hash).await; + self.write_block_inner(hash, data, mgr, existing_path).await } - async fn write_block( + async fn write_block_inner( &self, hash: &Hash, data: &DataBlock, mgr: &BlockManager, + existing_path: Option<DataBlockPath>, ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.block_dir(hash); - let directory = path.clone(); - path.push(hex::encode(hash)); + let directory = mgr.data_layout.load().primary_block_dir(hash); - fs::create_dir_all(&directory).await?; + let mut tgt_path = directory.clone(); + tgt_path.push(hex::encode(hash)); + if compressed { + tgt_path.set_extension("zst"); + } - let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(()), - (Ok(false), false) => return Ok(()), - (Ok(false), true) => { - let path_to_delete = path.clone(); - path.set_extension("zst"); - Some(path_to_delete) - } - (Err(_), compressed) => { - if compressed { - path.set_extension("zst"); - } - None - } + let to_delete = match (existing_path, compressed) { + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path + (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), + (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one + (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do + (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do + (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + + // If the block isn't stored already, just store what is given to us + (None, _) => None, }; + assert!(to_delete.as_ref() != Some(&tgt_path)); - let mut path_tmp = path.clone(); + let mut path_tmp = tgt_path.clone(); let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); path_tmp.set_extension(tmp_extension); + fs::create_dir_all(&directory).await?; + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; - f.sync_all().await?; + mgr.metrics.bytes_written.add(data.len() as u64); + + if mgr.data_fsync { + f.sync_all().await?; + } + drop(f); - fs::rename(path_tmp, path).await?; + fs::rename(path_tmp, tgt_path).await?; delete_on_drop.cancel(); @@ -724,52 +765,67 @@ impl BlockManagerLocked { fs::remove_file(to_delete).await?; } - // We want to ensure that when this function returns, data is properly persisted - // to disk. The first step is the sync_all above that does an fsync on the data file. - // Now, we do an fsync on the containing directory, to ensure that the rename - // is persisted properly. See: - // http://thedjbway.b0llix.net/qmail/syncdir.html - let dir = fs::OpenOptions::new() - .read(true) - .mode(0) - .open(directory) - .await?; - dir.sync_all().await?; - drop(dir); + if mgr.data_fsync { + // We want to ensure that when this function returns, data is properly persisted + // to disk. The first step is the sync_all above that does an fsync on the data file. + // Now, we do an fsync on the containing directory, to ensure that the rename + // is persisted properly. See: + // http://thedjbway.b0llix.net/qmail/syncdir.html + let dir = fs::OpenOptions::new() + .read(true) + .mode(0) + .open(directory) + .await?; + dir.sync_all().await?; + drop(dir); + } Ok(()) } - async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - warn!( - "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", - hash - ); - let mut path = mgr.block_path(hash); - let mut path2 = path.clone(); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); - path2.set_extension("zst.corrupted"); - } else { - path2.set_extension("corrupted"); - } + async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { + let (path, path2) = match block_path { + DataBlockPath::Plain(p) => { + let mut p2 = p.clone(); + p2.set_extension("corrupted"); + (p, p2) + } + DataBlockPath::Compressed(p) => { + let mut p2 = p.clone(); + p2.set_extension("zst.corrupted"); + (p, p2) + } + }; + fs::rename(path, path2).await?; Ok(()) } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - - if exists && needed.is_deletable() { - let mut path = mgr.block_path(hash); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); + let rc = mgr.rc.get_block_rc(hash)?; + if rc.is_deletable() { + while let Some(path) = mgr.find_block(hash).await { + let path = match path { + DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, + }; + fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } - fs::remove_file(path).await?; - mgr.metrics.delete_counter.add(1); } Ok(()) } + + async fn fix_block_location( + &self, + hash: &Hash, + wrong_path: DataBlockPath, + mgr: &BlockManager, + ) -> Result<usize, Error> { + let data = mgr.read_block_from(hash, &wrong_path).await?; + self.write_block_inner(hash, &data, mgr, Some(wrong_path)) + .await?; + Ok(data.inner_buffer().len()) + } } async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { |