diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 59 |
1 files changed, 27 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index ef7279e9..8ee33096 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -88,7 +88,7 @@ pub struct BlockManager { mutation_lock: Vec<Mutex<BlockManagerLocked>>, - pub(crate) rc: BlockRc, + pub rc: BlockRc, pub resync: BlockResyncManager, pub(crate) system: Arc<System>, @@ -156,7 +156,7 @@ impl BlockManager { let metrics = BlockManagerMetrics::new( config.compression_level, - rc.rc.clone(), + rc.rc_table.clone(), resync.queue.clone(), resync.errors.clone(), ); @@ -229,6 +229,12 @@ impl BlockManager { } } + /// Initialization: set how block references are recalculated + /// for repair operations + pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) { + self.rc.recalc_rc.store(Some(Arc::new(recalc))); + } + /// Ask nodes that might have a (possibly compressed) block for it /// Return it as a stream with a header async fn rpc_get_raw_block_streaming( @@ -267,8 +273,10 @@ impl BlockManager { F: Fn(DataBlockStream) -> Fut, Fut: futures::Future<Output = Result<T, Error>>, { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); + let who = self + .system + .rpc_helper() + .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { let node_id = NodeID::from(*node); @@ -308,15 +316,15 @@ impl BlockManager { // if the first one doesn't succeed rapidly // TODO: keep first request running when initiating a new one and take the // one that finishes earlier - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { + _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => { debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - let msg = format!("Get block {:?}: no node returned a valid block", hash); - debug!("{}", msg); - Err(Error::Message(msg)) + let err = Error::MissingBlock(*hash); + debug!("{}", err); + Err(err) } // ---- Public interface ---- @@ -341,26 +349,18 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it, return it as one big Bytes - pub async fn rpc_get_block( - &self, - hash: &Hash, - order_tag: Option<OrderTag>, - ) -> Result<Bytes, Error> { - let stream = self.rpc_get_block_streaming(hash, order_tag).await?; - Ok(read_stream_to_end(stream).await?.into_bytes()) - } - /// Send block to nodes that should have it pub async fn rpc_put_block( &self, hash: Hash, data: Bytes, + prevent_compression: bool, order_tag: Option<OrderTag>, ) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash); + let who = self.replication.write_sets(&hash); - let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + let compression_level = self.compression_level.filter(|_| !prevent_compression); + let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); let put_block_rpc = @@ -372,10 +372,10 @@ impl BlockManager { }; self.system - .rpc - .try_call_many( + .rpc_helper() + .try_write_many_sets( &self.endpoint, - &who[..], + who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()), @@ -387,12 +387,7 @@ impl BlockManager { /// Get number of items in the refcount table pub fn rc_len(&self) -> Result<usize, Error> { - Ok(self.rc.rc.len()?) - } - - /// Get number of items in the refcount table - pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> { - Ok(self.rc.rc.fast_len()?) + Ok(self.rc.rc_table.len()?) } /// Send command to start/stop/manager scrub worker @@ -410,7 +405,7 @@ impl BlockManager { /// List all resync errors pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> { - let mut blocks = Vec::with_capacity(self.resync.errors.len()); + let mut blocks = Vec::with_capacity(self.resync.errors.len()?); for ent in self.resync.errors.iter()? { let (hash, cnt) = ent?; let cnt = ErrorCounter::decode(&cnt); @@ -448,7 +443,7 @@ impl BlockManager { tokio::spawn(async move { if let Err(e) = this .resync - .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout()) { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } @@ -542,7 +537,7 @@ impl BlockManager { None => { // Not found but maybe we should have had it ?? self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash |