From c3982a90b6b9cece7c765070c2c2be22c816ff70 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 15 Mar 2022 12:12:12 +0100 Subject: Move DataBlock out of manager.rs --- src/block/manager.rs | 82 +++------------------------------------------------- 1 file changed, 4 insertions(+), 78 deletions(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index f047e1d3..9665a306 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,7 +5,6 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; use futures::future::*; use futures::select; @@ -31,6 +30,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::metrics::*; +use crate::block::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -71,73 +71,6 @@ pub enum BlockRpc { NeedBlockReply(bool), } -/// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] -pub enum DataBlock { - /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), - /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), -} - -impl DataBlock { - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) - } - - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res - } - - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result, Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. - pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { - Ok(()) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), - } - } - - pub fn from_buffer(data: Vec, level: Option) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); - } - } - DataBlock::Plain(data) - } -} - impl Rpc for BlockRpc { type Response = Result; } @@ -215,6 +148,8 @@ impl BlockManager { }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.clone().spawn_background_worker(); + block_manager } @@ -494,7 +429,7 @@ impl BlockManager { // ---- Resync loop ---- - pub fn spawn_background_worker(self: Arc) { + fn spawn_background_worker(self: Arc) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); tokio::spawn(async move { @@ -1080,12 +1015,3 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } - -fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { - let mut result = Vec::::new(); - let mut encoder = Encoder::new(&mut result, level)?; - encoder.include_checksum(true)?; - std::io::copy(&mut source, &mut encoder)?; - encoder.finish()?; - Ok(result) -} -- cgit v1.2.3