aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/block.rs2
-rw-r--r--src/block/lib.rs2
-rw-r--r--src/block/manager.rs34
-rw-r--r--src/block/resync.rs8
5 files changed, 22 insertions, 26 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 0f6fb5bb..b5763120 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_block"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/block/block.rs b/src/block/block.rs
index 504d11f8..bd95680e 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -96,7 +96,7 @@ impl DataBlock {
}
}
-fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
+pub fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;
diff --git a/src/block/lib.rs b/src/block/lib.rs
index c9ff2845..6c4711ef 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -9,3 +9,5 @@ mod block;
mod layout;
mod metrics;
mod rc;
+
+pub use block::zstd_encode;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 890ea8b7..c7e4df17 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -261,8 +261,10 @@ impl BlockManager {
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -302,7 +304,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
@@ -335,26 +337,18 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it, return it as one big Bytes
- pub async fn rpc_get_block(
- &self,
- hash: &Hash,
- order_tag: Option<OrderTag>,
- ) -> Result<Bytes, Error> {
- let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
- Ok(read_stream_to_end(stream).await?.into_bytes())
- }
-
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =
@@ -366,10 +360,10 @@ impl BlockManager {
};
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
@@ -442,7 +436,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -536,7 +530,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..15f210e4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -377,7 +377,7 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.write_nodes(hash);
+ let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -385,7 +385,7 @@ impl BlockResyncManager {
let who_needs_resps = manager
.system
- .rpc
+ .rpc_helper()
.call_many(
&manager.endpoint,
&who,
@@ -431,10 +431,10 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes);
manager
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&manager.endpoint,
- &need_nodes[..],
+ &need_nodes,
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(need_nodes.len()),