diff options
author | Alex <alex@adnab.me> | 2024-04-10 15:23:12 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-04-10 15:23:12 +0000 |
commit | 1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e (patch) | |
tree | 47e42c4e6ae47590fbb5c8f94e90a23bf04c1674 /src/block/manager.rs | |
parent | b47706809cc9d28d1328bafdf9756e96388cca24 (diff) | |
parent | ff093ddbb8485409f389abe7b5e569cb38d222d2 (diff) | |
download | garage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.tar.gz garage-1779fd40c0fe676bedda0d40f647d7fe8b0f1e7e.zip |
Merge pull request 'Garage v1.0' (#683) from next-0.10 into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/683
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 59 |
1 files changed, 27 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 62829a24..40b177a2 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -89,7 +89,7 @@ pub struct BlockManager { mutation_lock: Vec<Mutex<BlockManagerLocked>>, - pub(crate) rc: BlockRc, + pub rc: BlockRc, pub resync: BlockResyncManager, pub(crate) system: Arc<System>, @@ -158,7 +158,7 @@ impl BlockManager { let metrics = BlockManagerMetrics::new( config.compression_level, - rc.rc.clone(), + rc.rc_table.clone(), resync.queue.clone(), resync.errors.clone(), buffer_kb_semaphore.clone(), @@ -233,6 +233,12 @@ impl BlockManager { } } + /// Initialization: set how block references are recalculated + /// for repair operations + pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) { + self.rc.recalc_rc.store(Some(Arc::new(recalc))); + } + /// Ask nodes that might have a (possibly compressed) block for it /// Return it as a stream with a header async fn rpc_get_raw_block_streaming( @@ -279,8 +285,10 @@ impl BlockManager { F: Fn(DataBlockStream) -> Fut, Fut: futures::Future<Output = Result<T, Error>>, { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); + let who = self + .system + .rpc_helper() + .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { let node_id = NodeID::from(*node); @@ -320,15 +328,15 @@ impl BlockManager { // if the first one doesn't succeed rapidly // TODO: keep first request running when initiating a new one and take the // one that finishes earlier - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { + _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => { debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; } - let msg = format!("Get block {:?}: no node returned a valid block", hash); - debug!("{}", msg); - Err(Error::Message(msg)) + let err = Error::MissingBlock(*hash); + debug!("{}", err); + Err(err) } // ---- Public interface ---- @@ -355,26 +363,18 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it, return it as one big Bytes - pub async fn rpc_get_block( - &self, - hash: &Hash, - order_tag: Option<OrderTag>, - ) -> Result<Bytes, Error> { - let stream = self.rpc_get_block_streaming(hash, order_tag).await?; - Ok(read_stream_to_end(stream).await?.into_bytes()) - } - /// Send block to nodes that should have it pub async fn rpc_put_block( &self, hash: Hash, data: Bytes, + prevent_compression: bool, order_tag: Option<OrderTag>, ) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash); + let who = self.replication.write_sets(&hash); - let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + let compression_level = self.compression_level.filter(|_| !prevent_compression); + let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); @@ -394,10 +394,10 @@ impl BlockManager { }; self.system - .rpc - .try_call_many( + .rpc_helper() + .try_write_many_sets( &self.endpoint, - &who[..], + who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_drop_on_completion(permit) @@ -410,12 +410,7 @@ impl BlockManager { /// Get number of items in the refcount table pub fn rc_len(&self) -> Result<usize, Error> { - Ok(self.rc.rc.len()?) - } - - /// Get number of items in the refcount table - pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> { - Ok(self.rc.rc.fast_len()?) + Ok(self.rc.rc_table.len()?) } /// Send command to start/stop/manager scrub worker @@ -433,7 +428,7 @@ impl BlockManager { /// List all resync errors pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> { - let mut blocks = Vec::with_capacity(self.resync.errors.len()); + let mut blocks = Vec::with_capacity(self.resync.errors.len()?); for ent in self.resync.errors.iter()? { let (hash, cnt) = ent?; let cnt = ErrorCounter::decode(&cnt); @@ -471,7 +466,7 @@ impl BlockManager { tokio::spawn(async move { if let Err(e) = this .resync - .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout()) { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } @@ -565,7 +560,7 @@ impl BlockManager { None => { // Not found but maybe we should have had it ?? self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash |