aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs96
1 files changed, 44 insertions, 52 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 5bad34d4..d18d3f4c 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -88,7 +88,7 @@ pub struct BlockManager {
data_fsync: bool,
compression_level: Option<i32>,
- mutation_lock: [Mutex<BlockManagerLocked>; 256],
+ mutation_lock: Vec<Mutex<BlockManagerLocked>>,
pub(crate) rc: BlockRc,
pub resync: BlockResyncManager,
@@ -111,6 +111,9 @@ pub struct BlockResyncErrorInfo {
pub next_try: u64,
}
+// The number of different mutexes used to parallelize write access to data blocks
+const MUTEX_COUNT: usize = 256;
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -124,21 +127,24 @@ impl BlockManager {
compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
- ) -> Arc<Self> {
- // TODO don't panic, report error
+ ) -> Result<Arc<Self>, Error> {
+ // Load or compute layout, i.e. assignment of data blocks to the different data directories
let layout_persister: Persister<DataLayout> =
Persister::new(&system.metadata_dir, "data_layout");
let data_layout = match layout_persister.load() {
Ok(mut layout) => {
- layout.update(&data_dir).expect("invalid data_dir config");
+ layout
+ .update(&data_dir)
+ .ok_or_message("invalid data_dir config")?;
layout
}
- Err(_) => DataLayout::initialize(&data_dir).expect("invalid data_dir config"),
+ Err(_) => DataLayout::initialize(&data_dir).ok_or_message("invalid data_dir config")?,
};
layout_persister
.save(&data_layout)
.expect("cannot save data_layout");
+ // Open metadata tables
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -165,7 +171,10 @@ impl BlockManager {
data_layout,
data_fsync,
compression_level,
- mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
+ mutation_lock: vec![(); MUTEX_COUNT]
+ .iter()
+ .map(|_| Mutex::new(BlockManagerLocked()))
+ .collect::<Vec<_>>(),
rc,
resync,
system,
@@ -177,7 +186,7 @@ impl BlockManager {
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.scrub_persister.set_with(|_| ()).unwrap();
- block_manager
+ Ok(block_manager)
}
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
@@ -224,44 +233,10 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
-
- for node in who.iter() {
- let node_id = NodeID::from(*node);
- let rpc = self.endpoint.call_streaming(
- &node_id,
- BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
- );
- tokio::select! {
- res = rpc => {
- let res = match res {
- Ok(res) => res,
- Err(e) => {
- debug!("Node {:?} returned error: {}", node, e);
- continue;
- }
- };
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
- _ => {
- debug!("Node {:?} returned a malformed response", node);
- continue;
- }
- };
- return Ok((header, stream));
- }
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
- debug!("Node {:?} didn't return block in time, trying next.", node);
- }
- };
- }
-
- Err(Error::Message(format!(
- "Unable to read block {:?}: no node returned a valid block",
- hash
- )))
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ Ok((header, stream))
+ })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -271,6 +246,24 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ read_stream_to_end(stream)
+ .await
+ .map(|data| DataBlock::from_parts(header, data))
+ })
+ .await
+ }
+
+ async fn rpc_get_raw_block_internal<F, Fut, T>(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ f: F,
+ ) -> Result<T, Error>
+ where
+ F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ Fut: futures::Future<Output = Result<T, Error>>,
+ {
let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who);
@@ -297,13 +290,17 @@ impl BlockManager {
continue;
}
};
- match read_stream_to_end(stream).await {
- Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ match f(header, stream).await {
+ Ok(ret) => return Ok(ret),
Err(e) => {
debug!("Error reading stream from node {:?}: {}", node, e);
}
}
}
+ // TODO: sleep less long (fail early), initiate a second request earlier
+ // if the first one doesn't succeed rapidly
+ // TODO: keep first request running when initiating a new one and take the
+ // one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
@@ -680,11 +677,6 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
}
}
-pub(crate) struct BlockStatus {
- pub(crate) exists: bool,
- pub(crate) needed: RcEntry,
-}
-
impl BlockManagerLocked {
async fn write_block(
&self,