aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
committerAlex <alex@adnab.me>2024-04-10 15:23:12 +0000
commit1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e (patch)
tree47e42c4e6ae47590fbb5c8f94e90a23bf04c1674 /src/block/manager.rs
parentb47706809cc9d28d1328bafdf9756e96388cca24 (diff)
parentff093ddbb8485409f389abe7b5e569cb38d222d2 (diff)
downloadgarage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.tar.gz
garage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.zip
Merge pull request 'Garage v1.0' (#683) from next-0.10 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/683
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs59
1 files changed, 27 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 62829a24..40b177a2 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -89,7 +89,7 @@ pub struct BlockManager {
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
- pub(crate) rc: BlockRc,
+ pub rc: BlockRc,
pub resync: BlockResyncManager,
pub(crate) system: Arc<System>,
@@ -158,7 +158,7 @@ impl BlockManager {
let metrics = BlockManagerMetrics::new(
config.compression_level,
- rc.rc.clone(),
+ rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
buffer_kb_semaphore.clone(),
@@ -233,6 +233,12 @@ impl BlockManager {
}
}
+ /// Initialization: set how block references are recalculated
+ /// for repair operations
+ pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
+ self.rc.recalc_rc.store(Some(Arc::new(recalc)));
+ }
+
/// Ask nodes that might have a (possibly compressed) block for it
/// Return it as a stream with a header
async fn rpc_get_raw_block_streaming(
@@ -279,8 +285,10 @@ impl BlockManager {
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -320,15 +328,15 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
}
- let msg = format!("Get block {:?}: no node returned a valid block", hash);
- debug!("{}", msg);
- Err(Error::Message(msg))
+ let err = Error::MissingBlock(*hash);
+ debug!("{}", err);
+ Err(err)
}
// ---- Public interface ----
@@ -355,26 +363,18 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it, return it as one big Bytes
- pub async fn rpc_get_block(
- &self,
- hash: &Hash,
- order_tag: Option<OrderTag>,
- ) -> Result<Bytes, Error> {
- let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
- Ok(read_stream_to_end(stream).await?.into_bytes())
- }
-
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
@@ -394,10 +394,10 @@ impl BlockManager {
};
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)
@@ -410,12 +410,7 @@ impl BlockManager {
/// Get number of items in the refcount table
pub fn rc_len(&self) -> Result<usize, Error> {
- Ok(self.rc.rc.len()?)
- }
-
- /// Get number of items in the refcount table
- pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
- Ok(self.rc.rc.fast_len()?)
+ Ok(self.rc.rc_table.len()?)
}
/// Send command to start/stop/manager scrub worker
@@ -433,7 +428,7 @@ impl BlockManager {
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
- let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ let mut blocks = Vec::with_capacity(self.resync.errors.len()?);
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
@@ -471,7 +466,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -565,7 +560,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash