diff options
author | Alex Auvolat <alex@adnab.me> | 2022-03-15 12:12:12 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-23 10:25:19 +0100 |
commit | c3982a90b6b9cece7c765070c2c2be22c816ff70 (patch) | |
tree | 69e1d1e7510cb5195b40f1160b5de9ffc9f38450 /src/block/manager.rs | |
parent | c1d9854d2c5998adcad60784ec54659113c28a9f (diff) | |
download | garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.tar.gz garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.zip |
Move DataBlock out of manager.rs
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 82 |
1 files changed, 4 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index f047e1d3..9665a306 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,7 +5,6 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; use futures::future::*; use futures::select; @@ -31,6 +30,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::metrics::*; +use crate::block::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -71,73 +71,6 @@ pub enum BlockRpc { NeedBlockReply(bool), } -/// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] -pub enum DataBlock { - /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec<u8>), - /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec<u8>), -} - -impl DataBlock { - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) - } - - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res - } - - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. - pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { - Ok(()) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), - } - } - - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); - } - } - DataBlock::Plain(data) - } -} - impl Rpc for BlockRpc { type Response = Result<BlockRpc, Error>; } @@ -215,6 +148,8 @@ impl BlockManager { }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.clone().spawn_background_worker(); + block_manager } @@ -494,7 +429,7 @@ impl BlockManager { // ---- Resync loop ---- - pub fn spawn_background_worker(self: Arc<Self>) { + fn spawn_background_worker(self: Arc<Self>) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); tokio::spawn(async move { @@ -1080,12 +1015,3 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } - -fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { - let mut result = Vec::<u8>::new(); - let mut encoder = Encoder::new(&mut result, level)?; - encoder.include_checksum(true)?; - std::io::copy(&mut source, &mut encoder)?; - encoder.finish()?; - Ok(result) -} |