diff options
author | Alex Auvolat <alex@adnab.me> | 2022-03-15 12:12:12 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-03-23 10:25:19 +0100 |
commit | c3982a90b6b9cece7c765070c2c2be22c816ff70 (patch) | |
tree | 69e1d1e7510cb5195b40f1160b5de9ffc9f38450 | |
parent | c1d9854d2c5998adcad60784ec54659113c28a9f (diff) | |
download | garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.tar.gz garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.zip |
Move DataBlock out of manager.rs
-rw-r--r-- | src/block/block.rs | 81 | ||||
-rw-r--r-- | src/block/lib.rs | 1 | ||||
-rw-r--r-- | src/block/manager.rs | 82 | ||||
-rw-r--r-- | src/model/garage.rs | 3 |
4 files changed, 86 insertions, 81 deletions
diff --git a/src/block/block.rs b/src/block/block.rs new file mode 100644 index 00000000..4d3fbcb8 --- /dev/null +++ b/src/block/block.rs @@ -0,0 +1,81 @@ +use serde::{Deserialize, Serialize}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; + +use garage_util::data::*; +use garage_util::error::*; + +/// A possibly compressed block of data +#[derive(Debug, Serialize, Deserialize)] +pub enum DataBlock { + /// Uncompressed data + Plain(#[serde(with = "serde_bytes")] Vec<u8>), + /// Data compressed with zstd + Compressed(#[serde(with = "serde_bytes")] Vec<u8>), +} + +impl DataBlock { + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlock::Compressed(_)) + } + + /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] + /// instead + pub fn inner_buffer(&self) -> &[u8] { + use DataBlock::*; + let (Plain(ref res) | Compressed(ref res)) = self; + res + } + + /// Get the buffer, possibly decompressing it, and verify it's integrity. + /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system + /// is used instead. + pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(data) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) + } + } + } + + /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but + /// does not return the buffer content. + pub fn verify(&self, hash: Hash) -> Result<(), Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(data) == hash { + Ok(()) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)), + } + } + + pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } + } + DataBlock::Plain(data) + } +} + +fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { + let mut result = Vec::<u8>::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} diff --git a/src/block/lib.rs b/src/block/lib.rs index 47ff402d..0c67c956 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -3,4 +3,5 @@ extern crate tracing; pub mod manager; +mod block; mod metrics; diff --git a/src/block/manager.rs b/src/block/manager.rs index f047e1d3..9665a306 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,7 +5,6 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; use futures::future::*; use futures::select; @@ -31,6 +30,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::metrics::*; +use crate::block::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -71,73 +71,6 @@ pub enum BlockRpc { NeedBlockReply(bool), } -/// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] -pub enum DataBlock { - /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec<u8>), - /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec<u8>), -} - -impl DataBlock { - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) - } - - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res - } - - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. - pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { - Ok(()) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), - } - } - - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); - } - } - DataBlock::Plain(data) - } -} - impl Rpc for BlockRpc { type Response = Result<BlockRpc, Error>; } @@ -215,6 +148,8 @@ impl BlockManager { }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.clone().spawn_background_worker(); + block_manager } @@ -494,7 +429,7 @@ impl BlockManager { // ---- Resync loop ---- - pub fn spawn_background_worker(self: Arc<Self>) { + fn spawn_background_worker(self: Arc<Self>) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); tokio::spawn(async move { @@ -1080,12 +1015,3 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } - -fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { - let mut result = Vec::<u8>::new(); - let mut encoder = Encoder::new(&mut result, level)?; - encoder.include_checksum(true)?; - std::io::copy(&mut source, &mut encoder)?; - encoder.finish()?; - Ok(result) -} diff --git a/src/model/garage.rs b/src/model/garage.rs index 3f2605f1..93402ca2 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -153,9 +153,6 @@ impl Garage { block_ref_table, }); - info!("Start block manager background thread..."); - garage.block_manager.clone().spawn_background_worker(); - garage } |