aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs423
1 files changed, 235 insertions, 188 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index c7e4cd03..2d1b5c67 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
+use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
@@ -25,10 +25,11 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::{vars, BackgroundRunner};
+use garage_util::config::DataDirEnum;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::PersisterShared;
+use garage_util::persister::{Persister, PersisterShared};
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
@@ -38,6 +39,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
+use crate::layout::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
@@ -77,13 +79,16 @@ impl Rpc for BlockRpc {
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
- /// Directory in which block are stored
- pub data_dir: PathBuf,
+
+ /// Data layout
+ pub(crate) data_layout: ArcSwap<DataLayout>,
+ /// Data layout persister
+ pub(crate) data_layout_persister: Persister<DataLayout>,
data_fsync: bool,
compression_level: Option<i32>,
- mutation_lock: [Mutex<BlockManagerLocked>; 256],
+ mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@@ -106,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64,
}
+// The number of different mutexes used to parallelize write access to data blocks
+const MUTEX_COUNT: usize = 256;
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -114,12 +122,29 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
db: &db::Db,
- data_dir: PathBuf,
+ data_dir: DataDirEnum,
data_fsync: bool,
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
- ) -> Arc<Self> {
+ ) -> Result<Arc<Self>, Error> {
+ // Load or compute layout, i.e. assignment of data blocks to the different data directories
+ let data_layout_persister: Persister<DataLayout> =
+ Persister::new(&system.metadata_dir, "data_layout");
+ let data_layout = match data_layout_persister.load() {
+ Ok(mut layout) => {
+ layout
+ .update(&data_dir)
+ .ok_or_message("invalid data_dir config")?;
+ layout
+ }
+ Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
+ };
+ data_layout_persister
+ .save(&data_layout)
+ .expect("cannot save data_layout");
+
+ // Open metadata tables
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -142,10 +167,14 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
- data_dir,
+ data_layout: ArcSwap::new(Arc::new(data_layout)),
+ data_layout_persister,
data_fsync,
compression_level,
- mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
+ mutation_lock: vec![(); MUTEX_COUNT]
+ .iter()
+ .map(|_| Mutex::new(BlockManagerLocked()))
+ .collect::<Vec<_>>(),
rc,
resync,
system,
@@ -157,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap();
- block_manager
+ Ok(block_manager)
}
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@@ -204,44 +233,10 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
-
- for node in who.iter() {
- let node_id = NodeID::from(*node);
- let rpc = self.endpoint.call_streaming(
- &node_id,
- BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
- );
- tokio::select! {
- res = rpc => {
- let res = match res {
- Ok(res) => res,
- Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
- continue;
- }
- };
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
- continue;
- }
- };
- return Ok((header, stream));
- }
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
- }
- };
- }
-
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ Ok((header, stream))
+ })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -251,6 +246,24 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ read_stream_to_end(stream)
+ .await
+ .map(|data| DataBlock::from_parts(header, data))
+ })
+ .await
+ }
+
+ async fn rpc_get_raw_block_internal<F, Fut, T>(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ f: F,
+ ) -> Result<T, Error>
+ where
+ F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ Fut: futures::Future<Output = Result<T, Error>>,
+ {
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
@@ -266,34 +279,41 @@ impl BlockManager {
let res = match res {
Ok(res) => res,
Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
+ debug!("Get block {:?}: node {:?} could not be contacted: {}", hash, node, e);
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
+ (Ok(_), _) => {
+ debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
+ continue;
+ }
+ (Err(e), _) => {
+ debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
continue;
}
};
- match read_stream_to_end(stream).await {
- Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ match f(header, stream).await {
+ Ok(ret) => return Ok(ret),
Err(e) => {
- debug!("Error reading stream from node {:?}: {}", node, e);
+ debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
}
}
}
+ // TODO: sleep less long (fail early), initiate a second request earlier
+ // if the first one doesn't succeed rapidly
+ // TODO: keep first request running when initiating a new one and take the
+ // one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
+ debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ let msg = format!("Get block {:?}: no node returned a valid block", hash);
+ debug!("{}", msg);
+ Err(Error::Message(msg))
}
// ---- Public interface ----
@@ -471,8 +491,6 @@ impl BlockManager {
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
- let write_size = data.inner_buffer().len() as u64;
-
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
@@ -482,8 +500,6 @@ impl BlockManager {
))
.await?;
- self.metrics.bytes_written.add(write_size);
-
Ok(())
}
@@ -510,36 +526,42 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let data = self
- .read_block_internal(hash)
- .bound_record_duration(&self.metrics.block_read_duration)
- .await?;
-
- self.metrics
- .bytes_read
- .add(data.inner_buffer().len() as u64);
-
- Ok(data)
+ let tracer = opentelemetry::global::tracer("garage");
+ async {
+ match self.find_block(hash).await {
+ Some(p) => self.read_block_from(hash, &p).await,
+ None => {
+ // Not found but maybe we should have had it ??
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ return Err(Error::Message(format!(
+ "block {:?} not found on node",
+ hash
+ )));
+ }
+ }
+ }
+ .bound_record_duration(&self.metrics.block_read_duration)
+ .with_context(Context::current_with_span(
+ tracer.start("BlockManager::read_block"),
+ ))
+ .await
}
- async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
- let mut path = self.block_path(hash);
- let compressed = match self.is_block_compressed(hash).await {
- Ok(c) => c,
- Err(e) => {
- // Not found but maybe we should have had it ??
- self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
- return Err(Into::into(e));
- }
+ pub(crate) async fn read_block_from(
+ &self,
+ hash: &Hash,
+ block_path: &DataBlockPath,
+ ) -> Result<DataBlock, Error> {
+ let (path, compressed) = match block_path {
+ DataBlockPath::Plain(p) => (p, false),
+ DataBlockPath::Compressed(p) => (p, true),
};
- if compressed {
- path.set_extension("zst");
- }
- let mut f = fs::File::open(&path).await?;
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
@@ -551,29 +573,27 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
self.lock_mutate(hash)
.await
- .move_block_to_corrupted(hash, self)
+ .move_block_to_corrupted(block_path)
.await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
+
return Err(Error::CorruptData(*hash));
}
Ok(data)
}
- /// Check if this node has a block and whether it needs it
- pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.lock_mutate(hash)
- .await
- .check_block_status(hash, self)
- .await
- }
-
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
- Ok(needed.is_nonzero() && !exists)
+ let rc = self.rc.get_block_rc(hash)?;
+ let exists = self.find_block(hash).await.is_some();
+ Ok(rc.is_nonzero() && !exists)
}
/// Delete block if it is not needed anymore
@@ -584,59 +604,65 @@ impl BlockManager {
.await
}
- /// Utility: gives the path of the directory in which a block should be found
- fn block_dir(&self, hash: &Hash) -> PathBuf {
- let mut path = self.data_dir.clone();
- path.push(hex::encode(&hash.as_slice()[0..1]));
- path.push(hex::encode(&hash.as_slice()[1..2]));
- path
- }
-
- /// Utility: give the full path where a block should be found, minus extension if block is
- /// compressed
- fn block_path(&self, hash: &Hash) -> PathBuf {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash.as_ref()));
- path
- }
+ /// Find the path where a block is currently stored
+ pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
+ let data_layout = self.data_layout.load_full();
+ let dirs = Some(data_layout.primary_block_dir(hash))
+ .into_iter()
+ .chain(data_layout.secondary_block_dirs(hash));
+ let filename = hex::encode(hash.as_ref());
- /// Utility: check if block is stored compressed. Error if block is not stored
- async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
- let mut path = self.block_path(hash);
+ for dir in dirs {
+ let mut path = dir;
+ path.push(&filename);
- // If compression is disabled on node - check for the raw block
- // first and then a compressed one (as compression may have been
- // previously enabled).
- match self.compression_level {
- None => {
+ if self.compression_level.is_none() {
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
if fs::metadata(&path).await.is_ok() {
- return Ok(false);
+ return Some(DataBlockPath::Plain(path));
}
-
path.set_extension("zst");
-
- fs::metadata(&path).await.map(|_| true).map_err(Into::into)
- }
- _ => {
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Compressed(path));
+ }
+ } else {
path.set_extension("zst");
-
if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+ return Some(DataBlockPath::Compressed(path));
}
-
path.set_extension("");
-
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ if fs::metadata(&path).await.is_ok() {
+ return Some(DataBlockPath::Plain(path));
+ }
}
}
+
+ None
+ }
+
+ /// Rewrite a block at the primary location for its path and delete the old path.
+ /// Returns the number of bytes read/written
+ pub(crate) async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ ) -> Result<usize, Error> {
+ self.lock_mutate(hash)
+ .await
+ .fix_block_location(hash, wrong_path, self)
+ .await
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
- self.mutation_lock[hash.as_slice()[0] as usize]
+ let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
+ % self.mutation_lock.len();
+ self.mutation_lock[ilock]
.lock()
.with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
+ tracer.start(format!("Acquire mutation_lock #{}", ilock)),
))
.await
}
@@ -649,7 +675,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, header } => Resp::new(
self.handle_put_block(*hash, *header, message.take_stream())
.await
- .map(|_| BlockRpc::Ok),
+ .map(|()| BlockRpc::Ok),
),
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
@@ -660,62 +686,70 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
}
}
-pub(crate) struct BlockStatus {
- pub(crate) exists: bool,
- pub(crate) needed: RcEntry,
-}
-
impl BlockManagerLocked {
- async fn check_block_status(
+ async fn write_block(
&self,
hash: &Hash,
+ data: &DataBlock,
mgr: &BlockManager,
- ) -> Result<BlockStatus, Error> {
- let exists = mgr.is_block_compressed(hash).await.is_ok();
- let needed = mgr.rc.get_block_rc(hash)?;
-
- Ok(BlockStatus { exists, needed })
+ ) -> Result<(), Error> {
+ let existing_path = mgr.find_block(hash).await;
+ self.write_block_inner(hash, data, mgr, existing_path).await
}
- async fn write_block(
+ async fn write_block_inner(
&self,
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
+ existing_path: Option<DataBlockPath>,
) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
- let mut path = mgr.block_dir(hash);
- let directory = path.clone();
- path.push(hex::encode(hash));
+ let directory = mgr.data_layout.load().primary_block_dir(hash);
- fs::create_dir_all(&directory).await?;
+ let mut tgt_path = directory.clone();
+ tgt_path.push(hex::encode(hash));
+ if compressed {
+ tgt_path.set_extension("zst");
+ }
- let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(()),
- (Ok(false), false) => return Ok(()),
- (Ok(false), true) => {
- let path_to_delete = path.clone();
- path.set_extension("zst");
- Some(path_to_delete)
- }
- (Err(_), compressed) => {
- if compressed {
- path.set_extension("zst");
- }
- None
- }
+ let to_delete = match (existing_path, compressed) {
+ // If the block is stored in the wrong directory,
+ // write it again at the correct path and delete the old path
+ (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
+ (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
+
+ // If the block is already stored not compressed but we have a compressed
+ // copy, write the compressed copy and delete the uncompressed one
+ (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
+
+ // If the block is already stored compressed,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
+
+ // If the block is already stored not compressed,
+ // and we don't have a compressed copy either,
+ // keep the stored copy, we have nothing to do
+ (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
+
+ // If the block isn't stored already, just store what is given to us
+ (None, _) => None,
};
+ assert!(to_delete.as_ref() != Some(&tgt_path));
- let mut path_tmp = path.clone();
+ let mut path_tmp = tgt_path.clone();
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
path_tmp.set_extension(tmp_extension);
+ fs::create_dir_all(&directory).await?;
+
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
+ mgr.metrics.bytes_written.add(data.len() as u64);
if mgr.data_fsync {
f.sync_all().await?;
@@ -723,7 +757,7 @@ impl BlockManagerLocked {
drop(f);
- fs::rename(path_tmp, path).await?;
+ fs::rename(path_tmp, tgt_path).await?;
delete_on_drop.cancel();
@@ -749,36 +783,49 @@ impl BlockManagerLocked {
Ok(())
}
- async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- warn!(
- "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
- hash
- );
- let mut path = mgr.block_path(hash);
- let mut path2 = path.clone();
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
- path2.set_extension("zst.corrupted");
- } else {
- path2.set_extension("corrupted");
- }
+ async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
+ let (path, path2) = match block_path {
+ DataBlockPath::Plain(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("corrupted");
+ (p, p2)
+ }
+ DataBlockPath::Compressed(p) => {
+ let mut p2 = p.clone();
+ p2.set_extension("zst.corrupted");
+ (p, p2)
+ }
+ };
+
fs::rename(path, path2).await?;
Ok(())
}
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
- let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
-
- if exists && needed.is_deletable() {
- let mut path = mgr.block_path(hash);
- if mgr.is_block_compressed(hash).await? {
- path.set_extension("zst");
+ let rc = mgr.rc.get_block_rc(hash)?;
+ if rc.is_deletable() {
+ while let Some(path) = mgr.find_block(hash).await {
+ let path = match path {
+ DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
+ };
+ fs::remove_file(path).await?;
+ mgr.metrics.delete_counter.add(1);
}
- fs::remove_file(path).await?;
- mgr.metrics.delete_counter.add(1);
}
Ok(())
}
+
+ async fn fix_block_location(
+ &self,
+ hash: &Hash,
+ wrong_path: DataBlockPath,
+ mgr: &BlockManager,
+ ) -> Result<usize, Error> {
+ let data = mgr.read_block_from(hash, &wrong_path).await?;
+ self.write_block_inner(hash, &data, mgr, Some(wrong_path))
+ .await?;
+ Ok(data.inner_buffer().len())
+ }
}
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {