diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/Cargo.toml | 2 | ||||
-rw-r--r-- | src/block/block.rs | 2 | ||||
-rw-r--r-- | src/block/lib.rs | 2 | ||||
-rw-r--r-- | src/block/manager.rs | 34 | ||||
-rw-r--r-- | src/block/resync.rs | 8 |
5 files changed, 22 insertions, 26 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 0f6fb5bb..b5763120 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_block" -version = "0.9.3" +version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" diff --git a/src/block/block.rs b/src/block/block.rs index 504d11f8..bd95680e 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -96,7 +96,7 @@ impl DataBlock { } } -fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { +pub fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { let mut result = Vec::<u8>::new(); let mut encoder = Encoder::new(&mut result, level)?; encoder.include_checksum(true)?; diff --git a/src/block/lib.rs b/src/block/lib.rs index c9ff2845..6c4711ef 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -9,3 +9,5 @@ mod block; mod layout; mod metrics; mod rc; + +pub use block::zstd_encode; diff --git a/src/block/manager.rs b/src/block/manager.rs index 890ea8b7..c7e4df17 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -261,8 +261,10 @@ impl BlockManager { F: Fn(DataBlockStream) -> Fut, Fut: futures::Future<Output = Result<T, Error>>, { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); + let who = self + .system + .rpc_helper() + .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { let node_id = NodeID::from(*node); @@ -302,7 +304,7 @@ impl BlockManager { // if the first one doesn't succeed rapidly // TODO: keep first request running when initiating a new one and take the // one that finishes earlier - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { + _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => { debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; @@ -335,26 +337,18 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it, return it as one big Bytes - pub async fn rpc_get_block( - &self, - hash: &Hash, - order_tag: Option<OrderTag>, - ) -> Result<Bytes, Error> { - let stream = self.rpc_get_block_streaming(hash, order_tag).await?; - Ok(read_stream_to_end(stream).await?.into_bytes()) - } - /// Send block to nodes that should have it pub async fn rpc_put_block( &self, hash: Hash, data: Bytes, + prevent_compression: bool, order_tag: Option<OrderTag>, ) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash); + let who = self.replication.write_sets(&hash); - let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + let compression_level = self.compression_level.filter(|_| !prevent_compression); + let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); let put_block_rpc = @@ -366,10 +360,10 @@ impl BlockManager { }; self.system - .rpc - .try_call_many( + .rpc_helper() + .try_write_many_sets( &self.endpoint, - &who[..], + who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()), @@ -442,7 +436,7 @@ impl BlockManager { tokio::spawn(async move { if let Err(e) = this .resync - .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout()) { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } @@ -536,7 +530,7 @@ impl BlockManager { None => { // Not found but maybe we should have had it ?? self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c1da4a7..15f210e4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -377,7 +377,7 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.write_nodes(hash); + let mut who = manager.replication.storage_nodes(hash); if who.len() < manager.replication.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } @@ -385,7 +385,7 @@ impl BlockResyncManager { let who_needs_resps = manager .system - .rpc + .rpc_helper() .call_many( &manager.endpoint, &who, @@ -431,10 +431,10 @@ impl BlockResyncManager { .with_stream_from_buffer(bytes); manager .system - .rpc + .rpc_helper() .try_call_many( &manager.endpoint, - &need_nodes[..], + &need_nodes, put_block_message, RequestStrategy::with_priority(PRIO_BACKGROUND) .with_quorum(need_nodes.len()), |