aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-03-15 12:12:12 +0100
committerAlex Auvolat <alex@adnab.me>2022-03-23 10:25:19 +0100
commitc3982a90b6b9cece7c765070c2c2be22c816ff70 (patch)
tree69e1d1e7510cb5195b40f1160b5de9ffc9f38450
parentc1d9854d2c5998adcad60784ec54659113c28a9f (diff)
downloadgarage-c3982a90b6b9cece7c765070c2c2be22c816ff70.tar.gz
garage-c3982a90b6b9cece7c765070c2c2be22c816ff70.zip
Move DataBlock out of manager.rs
-rw-r--r--src/block/block.rs81
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs82
-rw-r--r--src/model/garage.rs3
4 files changed, 86 insertions, 81 deletions
diff --git a/src/block/block.rs b/src/block/block.rs
new file mode 100644
index 00000000..4d3fbcb8
--- /dev/null
+++ b/src/block/block.rs
@@ -0,0 +1,81 @@
+use serde::{Deserialize, Serialize};
+use zstd::stream::{decode_all as zstd_decode, Encoder};
+
+use garage_util::data::*;
+use garage_util::error::*;
+
+/// A possibly compressed block of data
+#[derive(Debug, Serialize, Deserialize)]
+pub enum DataBlock {
+ /// Uncompressed data
+ Plain(#[serde(with = "serde_bytes")] Vec<u8>),
+ /// Data compressed with zstd
+ Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
+}
+
+impl DataBlock {
+ /// Query whether this block is compressed
+ pub fn is_compressed(&self) -> bool {
+ matches!(self, DataBlock::Compressed(_))
+ }
+
+ /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
+ /// instead
+ pub fn inner_buffer(&self) -> &[u8] {
+ use DataBlock::*;
+ let (Plain(ref res) | Compressed(ref res)) = self;
+ res
+ }
+
+ /// Get the buffer, possibly decompressing it, and verify it's integrity.
+ /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
+ /// is used instead.
+ pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
+ match self {
+ DataBlock::Plain(data) => {
+ if blake2sum(&data) == hash {
+ Ok(data)
+ } else {
+ Err(Error::CorruptData(hash))
+ }
+ }
+ DataBlock::Compressed(data) => {
+ zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
+ }
+ }
+ }
+
+ /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
+ /// does not return the buffer content.
+ pub fn verify(&self, hash: Hash) -> Result<(), Error> {
+ match self {
+ DataBlock::Plain(data) => {
+ if blake2sum(data) == hash {
+ Ok(())
+ } else {
+ Err(Error::CorruptData(hash))
+ }
+ }
+ DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
+ .map_err(|_| Error::CorruptData(hash)),
+ }
+ }
+
+ pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
+ if let Some(level) = level {
+ if let Ok(data) = zstd_encode(&data[..], level) {
+ return DataBlock::Compressed(data);
+ }
+ }
+ DataBlock::Plain(data)
+ }
+}
+
+fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
+ let mut result = Vec::<u8>::new();
+ let mut encoder = Encoder::new(&mut result, level)?;
+ encoder.include_checksum(true)?;
+ std::io::copy(&mut source, &mut encoder)?;
+ encoder.finish()?;
+ Ok(result)
+}
diff --git a/src/block/lib.rs b/src/block/lib.rs
index 47ff402d..0c67c956 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -3,4 +3,5 @@ extern crate tracing;
pub mod manager;
+mod block;
mod metrics;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index f047e1d3..9665a306 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -5,7 +5,6 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use zstd::stream::{decode_all as zstd_decode, Encoder};
use futures::future::*;
use futures::select;
@@ -31,6 +30,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::metrics::*;
+use crate::block::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -71,73 +71,6 @@ pub enum BlockRpc {
NeedBlockReply(bool),
}
-/// A possibly compressed block of data
-#[derive(Debug, Serialize, Deserialize)]
-pub enum DataBlock {
- /// Uncompressed data
- Plain(#[serde(with = "serde_bytes")] Vec<u8>),
- /// Data compressed with zstd
- Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
-}
-
-impl DataBlock {
- /// Query whether this block is compressed
- pub fn is_compressed(&self) -> bool {
- matches!(self, DataBlock::Compressed(_))
- }
-
- /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
- /// instead
- pub fn inner_buffer(&self) -> &[u8] {
- use DataBlock::*;
- let (Plain(ref res) | Compressed(ref res)) = self;
- res
- }
-
- /// Get the buffer, possibly decompressing it, and verify it's integrity.
- /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
- /// is used instead.
- pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(&data) == hash {
- Ok(data)
- } else {
- Err(Error::CorruptData(hash))
- }
- }
- DataBlock::Compressed(data) => {
- zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
- }
- }
- }
-
- /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
- /// does not return the buffer content.
- pub fn verify(&self, hash: Hash) -> Result<(), Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(data) == hash {
- Ok(())
- } else {
- Err(Error::CorruptData(hash))
- }
- }
- DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
- .map_err(|_| Error::CorruptData(hash)),
- }
- }
-
- pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
- if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
- }
- }
- DataBlock::Plain(data)
- }
-}
-
impl Rpc for BlockRpc {
type Response = Result<BlockRpc, Error>;
}
@@ -215,6 +148,8 @@ impl BlockManager {
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager.clone().spawn_background_worker();
+
block_manager
}
@@ -494,7 +429,7 @@ impl BlockManager {
// ---- Resync loop ----
- pub fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_worker(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
tokio::spawn(async move {
@@ -1080,12 +1015,3 @@ impl ErrorCounter {
self.last_try + self.delay_msec()
}
}
-
-fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
- let mut result = Vec::<u8>::new();
- let mut encoder = Encoder::new(&mut result, level)?;
- encoder.include_checksum(true)?;
- std::io::copy(&mut source, &mut encoder)?;
- encoder.finish()?;
- Ok(result)
-}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 3f2605f1..93402ca2 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -153,9 +153,6 @@ impl Garage {
block_ref_table,
});
- info!("Start block manager background thread...");
- garage.block_manager.clone().spawn_background_worker();
-
garage
}