aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-15 12:12:12 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-23 10:25:19 +0100
commitc3982a90b6b9cece7c765070c2c2be22c816ff70 (patch)
tree69e1d1e7510cb5195b40f1160b5de9ffc9f38450 /src/block/manager.rs
parentc1d9854d2c5998adcad60784ec54659113c28a9f (diff)
downloadgarage-c3982a90b6b9cece7c765070c2c2be22c816ff70.tar.gz
garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.zip
Move DataBlock out of manager.rs
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs82
1 files changed, 4 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index f047e1d3..9665a306 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -5,7 +5,6 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use zstd::stream::{decode_all as zstd_decode, Encoder};
use futures::future::*;
use futures::select;
@@ -31,6 +30,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::metrics::*;
+use crate::block::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -71,73 +71,6 @@ pub enum BlockRpc {
NeedBlockReply(bool),
}
-/// A possibly compressed block of data
-#[derive(Debug, Serialize, Deserialize)]
-pub enum DataBlock {
- /// Uncompressed data
- Plain(#[serde(with = "serde_bytes")] Vec<u8>),
- /// Data compressed with zstd
- Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
-}
-
-impl DataBlock {
- /// Query whether this block is compressed
- pub fn is_compressed(&self) -> bool {
- matches!(self, DataBlock::Compressed(_))
- }
-
- /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
- /// instead
- pub fn inner_buffer(&self) -> &[u8] {
- use DataBlock::*;
- let (Plain(ref res) | Compressed(ref res)) = self;
- res
- }
-
- /// Get the buffer, possibly decompressing it, and verify it's integrity.
- /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
- /// is used instead.
- pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(&data) == hash {
- Ok(data)
- } else {
- Err(Error::CorruptData(hash))
- }
- }
- DataBlock::Compressed(data) => {
- zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
- }
- }
- }
-
- /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
- /// does not return the buffer content.
- pub fn verify(&self, hash: Hash) -> Result<(), Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(data) == hash {
- Ok(())
- } else {
- Err(Error::CorruptData(hash))
- }
- }
- DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
- .map_err(|_| Error::CorruptData(hash)),
- }
- }
-
- pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
- if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
- }
- }
- DataBlock::Plain(data)
- }
-}
-
impl Rpc for BlockRpc {
type Response = Result<BlockRpc, Error>;
}
@@ -215,6 +148,8 @@ impl BlockManager {
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager.clone().spawn_background_worker();
+
block_manager
}
@@ -494,7 +429,7 @@ impl BlockManager {
// ---- Resync loop ----
- pub fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_worker(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
tokio::spawn(async move {
@@ -1080,12 +1015,3 @@ impl ErrorCounter {
self.last_try + self.delay_msec()
}
}
-
-fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
- let mut result = Vec::<u8>::new();
- let mut encoder = Encoder::new(&mut result, level)?;
- encoder.include_checksum(true)?;
- std::io::copy(&mut source, &mut encoder)?;
- encoder.finish()?;
- Ok(result)
-}