diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 96 |
1 files changed, 44 insertions, 52 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 5bad34d4..d18d3f4c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { data_fsync: bool, compression_level: Option<i32>, - mutation_lock: [Mutex<BlockManagerLocked>; 256], + mutation_lock: Vec<Mutex<BlockManagerLocked>>, pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -111,6 +111,9 @@ pub struct BlockResyncErrorInfo { pub next_try: u64, } +// The number of different mutexes used to parallelize write access to data blocks +const MUTEX_COUNT: usize = 256; + // This custom struct contains functions that must only be ran // when the lock is held. We ensure that it is the case by storing // it INSIDE a Mutex. @@ -124,21 +127,24 @@ impl BlockManager { compression_level: Option<i32>, replication: TableShardedReplication, system: Arc<System>, - ) -> Arc<Self> { - // TODO don't panic, report error + ) -> Result<Arc<Self>, Error> { + // Load or compute layout, i.e. assignment of data blocks to the different data directories let layout_persister: Persister<DataLayout> = Persister::new(&system.metadata_dir, "data_layout"); let data_layout = match layout_persister.load() { Ok(mut layout) => { - layout.update(&data_dir).expect("invalid data_dir config"); + layout + .update(&data_dir) + .ok_or_message("invalid data_dir config")?; layout } - Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"), + Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?, }; layout_persister .save(&data_layout) .expect("cannot save data_layout"); + // Open metadata tables let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -165,7 +171,10 @@ impl BlockManager { data_layout, data_fsync, compression_level, - mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), + mutation_lock: vec![(); MUTEX_COUNT] + .iter() + .map(|_| Mutex::new(BlockManagerLocked())) + .collect::<Vec<_>>(), rc, resync, system, @@ -177,7 +186,7 @@ impl BlockManager { block_manager.endpoint.set_handler(block_manager.clone()); block_manager.scrub_persister.set_with(|_| ()).unwrap(); - block_manager + Ok(block_manager) } pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { @@ -224,44 +233,10 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<(DataBlockHeader, ByteStream), Error> { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); - - for node in who.iter() { - let node_id = NodeID::from(*node); - let rpc = self.endpoint.call_streaming( - &node_id, - BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, - ); - tokio::select! { - res = rpc => { - let res = match res { - Ok(res) => res, - Err(e) => { - debug!("Node {:?} returned error: {}", node, e); - continue; - } - }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), - _ => { - debug!("Node {:?} returned a malformed response", node); - continue; - } - }; - return Ok((header, stream)); - } - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { - debug!("Node {:?} didn't return block in time, trying next.", node); - } - }; - } - - Err(Error::Message(format!( - "Unable to read block {:?}: no node returned a valid block", - hash - ))) + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + Ok((header, stream)) + }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -271,6 +246,24 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { + self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + read_stream_to_end(stream) + .await + .map(|data| DataBlock::from_parts(header, data)) + }) + .await + } + + async fn rpc_get_raw_block_internal<F, Fut, T>( + &self, + hash: &Hash, + order_tag: Option<OrderTag>, + f: F, + ) -> Result<T, Error> + where + F: Fn(DataBlockHeader, ByteStream) -> Fut, + Fut: futures::Future<Output = Result<T, Error>>, + { let who = self.replication.read_nodes(hash); let who = self.system.rpc.request_order(&who); @@ -297,13 +290,17 @@ impl BlockManager { continue; } }; - match read_stream_to_end(stream).await { - Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + match f(header, stream).await { + Ok(ret) => return Ok(ret), Err(e) => { debug!("Error reading stream from node {:?}: {}", node, e); } } } + // TODO: sleep less long (fail early), initiate a second request earlier + // if the first one doesn't succeed rapidly + // TODO: keep first request running when initiating a new one and take the + // one that finishes earlier _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { debug!("Node {:?} didn't return block in time, trying next.", node); } @@ -680,11 +677,6 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager { } } -pub(crate) struct BlockStatus { - pub(crate) exists: bool, - pub(crate) needed: RcEntry, -} - impl BlockManagerLocked { async fn write_block( &self, |