aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/block.rs17
-rw-r--r--src/block/manager.rs6
2 files changed, 15 insertions, 8 deletions
diff --git a/src/block/block.rs b/src/block/block.rs
index 4d3fbcb8..f17bd2c0 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -1,3 +1,4 @@
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -61,13 +62,17 @@ impl DataBlock {
}
}
- pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
- if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
+ pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
+ tokio::task::spawn_blocking(move || {
+ if let Some(level) = level {
+ if let Ok(data) = zstd_encode(&data[..], level) {
+ return DataBlock::Compressed(data);
+ }
}
- }
- DataBlock::Plain(data)
+ DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
+ })
+ .await
+ .unwrap()
}
}
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 017ba9da..890c247d 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -5,6 +5,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
+use bytes::Bytes;
use serde::{Deserialize, Serialize};
use futures::future::*;
@@ -211,14 +212,15 @@ impl BlockManager {
}
/// Send block to nodes that should have it
- pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let data = DataBlock::from_buffer(data, self.compression_level);
+ let data = DataBlock::from_buffer(data, self.compression_level).await;
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
+ // TODO: remove to_vec() here
BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum())