aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs59
1 files changed, 27 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 34d854b9..82db2cab 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -88,7 +88,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
- pub(crate) rc: BlockRc,
+ pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@@ -154,7 +154,7 @@ impl BlockManager {
let metrics = BlockManagerMetrics::new(
config.compression_level,
- rc.rc.clone(),
+ rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
);
@@ -227,6 +227,12 @@ impl BlockManager {
}
}
+ /// Initialization: set how block references are recalculated
+ /// for repair operations
+ pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
+ self.rc.recalc_rc.store(Some(Arc::new(recalc)));
+ }
+
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@@ -265,8 +271,10 @@ impl BlockManager {
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -306,15 +314,15 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- let msg = format!("Get block {:?}: no node returned a valid block", hash);
- debug!("{}", msg);
- Err(Error::Message(msg))
+ let err = Error::MissingBlock(*hash);
+ debug!("{}", err);
+ Err(err)
}
// ---- Public interface ----
@@ -339,26 +347,18 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it, return it as one big Bytes
- pub async fn rpc_get_block(
- &self,
- hash: &Hash,
- order_tag: Option<OrderTag>,
- ) -> Result<Bytes, Error> {
- let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
- Ok(read_stream_to_end(stream).await?.into_bytes())
- }
-
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =
@@ -370,10 +370,10 @@ impl BlockManager {
};
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
@@ -385,12 +385,7 @@ impl BlockManager {
/// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> {
- Ok(self.rc.rc.len()?)
- }
-
- /// Get number of items in the refcount table
- pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
- Ok(self.rc.rc.fast_len()?)
+ Ok(self.rc.rc_table.len()?)
}
/// Send command to start/stop/manager scrub worker
@@ -408,7 +403,7 @@ impl BlockManager {
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
- let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ let mut blocks = Vec::with_capacity(self.resync.errors.len()?);
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
@@ -446,7 +441,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -540,7 +535,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash