aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/async_hash.rs61
-rw-r--r--src/util/config.rs44
-rw-r--r--src/util/data.rs35
-rw-r--r--src/util/error.rs10
-rw-r--r--src/util/lib.rs1
6 files changed, 63 insertions, 90 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 72581c16..e4c31460 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs
deleted file mode 100644
index 5631ea6b..00000000
--- a/src/util/async_hash.rs
+++ /dev/null
@@ -1,61 +0,0 @@
-use bytes::Bytes;
-use digest::Digest;
-
-use tokio::sync::mpsc;
-use tokio::task::JoinHandle;
-
-use crate::data::*;
-
-/// Compute the sha256 of a slice,
-/// spawning on a tokio thread for CPU-intensive processing
-/// The argument has to be an owned Bytes, as it is moved out to a new thread.
-pub async fn async_sha256sum(data: Bytes) -> Hash {
- tokio::task::spawn_blocking(move || sha256sum(&data))
- .await
- .unwrap()
-}
-
-/// Compute the blake2sum of a slice,
-/// spawning on a tokio thread for CPU-intensive processing.
-/// The argument has to be an owned Bytes, as it is moved out to a new thread.
-pub async fn async_blake2sum(data: Bytes) -> Hash {
- tokio::task::spawn_blocking(move || blake2sum(&data))
- .await
- .unwrap()
-}
-
-// ----
-
-pub struct AsyncHasher<D: Digest> {
- sendblk: mpsc::Sender<Bytes>,
- task: JoinHandle<digest::Output<D>>,
-}
-
-impl<D: Digest> AsyncHasher<D> {
- pub fn new() -> Self {
- let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
- let task = tokio::task::spawn_blocking(move || {
- let mut digest = D::new();
- while let Some(blk) = recvblk.blocking_recv() {
- digest.update(&blk[..]);
- }
- digest.finalize()
- });
- Self { sendblk, task }
- }
-
- pub async fn update(&self, b: Bytes) {
- self.sendblk.send(b).await.unwrap();
- }
-
- pub async fn finalize(self) -> digest::Output<D> {
- drop(self.sendblk);
- self.task.await.unwrap()
- }
-}
-
-impl<D: Digest> Default for AsyncHasher<D> {
- fn default() -> Self {
- Self::new()
- }
-}
diff --git a/src/util/config.rs b/src/util/config.rs
index 8ecbdfbb..c5a24f76 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -38,12 +38,20 @@ pub struct Config {
)]
pub block_size: usize,
- /// Replication mode. Supported values:
- /// - none, 1 -> no replication
- /// - 2 -> 2-way replication
- /// - 3 -> 3-way replication
- // (we can add more aliases for this later)
- pub replication_mode: String,
+ /// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
+ /// - 1 for single-node clusters, or to disable replication
+ /// - 3 is the recommended and supported setting.
+ #[serde(default)]
+ pub replication_factor: Option<usize>,
+
+ /// Consistency mode for all for requests through this node
+ /// - Degraded -> Disable read quorum
+ /// - Dangerous -> Disable read and write quorum
+ #[serde(default = "default_consistency_mode")]
+ pub consistency_mode: String,
+
+ /// Legacy option
+ pub replication_mode: Option<String>,
/// Zstd compression level used on data blocks
#[serde(
@@ -87,20 +95,10 @@ pub struct Config {
pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
// -- DB
- /// Database engine to use for metadata (options: sled, sqlite, lmdb)
+ /// Database engine to use for metadata (options: sqlite, lmdb)
#[serde(default = "default_db_engine")]
pub db_engine: String,
- /// Sled cache size, in bytes
- #[serde(
- deserialize_with = "deserialize_capacity",
- default = "default_sled_cache_capacity"
- )]
- pub sled_cache_capacity: usize,
- /// Sled flush interval in milliseconds
- #[serde(default = "default_sled_flush_every_ms")]
- pub sled_flush_every_ms: u64,
-
/// LMDB map size
#[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize,
@@ -246,16 +244,14 @@ fn default_db_engine() -> String {
"lmdb".into()
}
-fn default_sled_cache_capacity() -> usize {
- 128 * 1024 * 1024
-}
-fn default_sled_flush_every_ms() -> u64 {
- 2000
-}
fn default_block_size() -> usize {
1048576
}
+fn default_consistency_mode() -> String {
+ "consistent".into()
+}
+
fn default_compression() -> Option<i32> {
Some(1)
}
@@ -367,7 +363,7 @@ mod tests {
r#"
metadata_dir = "/tmp/garage/meta"
data_dir = "/tmp/garage/data"
- replication_mode = "3"
+ replication_factor = 3
rpc_bind_addr = "[::]:3901"
rpc_secret = "foo"
diff --git a/src/util/data.rs b/src/util/data.rs
index 2579fd1b..1fe7dfe0 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -83,6 +83,19 @@ impl FixedBytes32 {
ret.copy_from_slice(by);
Some(Self(ret))
}
+ /// Return the next hash
+ pub fn increment(&self) -> Option<Self> {
+ let mut ret = *self;
+ for byte in ret.0.iter_mut().rev() {
+ if *byte == u8::MAX {
+ *byte = 0;
+ } else {
+ *byte = *byte + 1;
+ return Some(ret);
+ }
+ }
+ return None;
+ }
}
impl From<garage_net::NodeID> for FixedBytes32 {
@@ -140,3 +153,25 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_increment() {
+ let zero: FixedBytes32 = [0u8; 32].into();
+ let mut one: FixedBytes32 = [0u8; 32].into();
+ one.0[31] = 1;
+ let max: FixedBytes32 = [0xFFu8; 32].into();
+ assert_eq!(zero.increment(), Some(one));
+ assert_eq!(max.increment(), None);
+
+ let mut test: FixedBytes32 = [0u8; 32].into();
+ let i = 0x198DF97209F8FFFFu64;
+ test.0[24..32].copy_from_slice(&u64::to_be_bytes(i));
+ let mut test2: FixedBytes32 = [0u8; 32].into();
+ test2.0[24..32].copy_from_slice(&u64::to_be_bytes(i + 1));
+ assert_eq!(test.increment(), Some(test2));
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index e73d88ba..75fd3f9c 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -55,13 +55,14 @@ pub enum Error {
Timeout,
#[error(
- display = "Could not reach quorum of {}. {} of {} request succeeded, others returned errors: {:?}",
+ display = "Could not reach quorum of {} (sets={:?}). {} of {} request succeeded, others returned errors: {:?}",
_0,
_1,
_2,
- _3
+ _3,
+ _4
)]
- Quorum(usize, usize, usize, Vec<String>),
+ Quorum(usize, Option<usize>, usize, usize, Vec<String>),
#[error(display = "Unexpected RPC message: {}", _0)]
UnexpectedRpcMessage(String),
@@ -69,6 +70,9 @@ pub enum Error {
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash),
+ #[error(display = "Missing block {:?}: no node returned a valid block", _0)]
+ MissingBlock(Hash),
+
#[error(display = "{}", _0)]
Message(String),
}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 7df77959..8b035ff0 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -3,7 +3,6 @@
#[macro_use]
extern crate tracing;
-pub mod async_hash;
pub mod background;
pub mod config;
pub mod crdt;