aboutsummaryrefslogtreecommitdiff
path: root/src/util
diff options
context:
space:
mode:
Diffstat (limited to 'src/util')
-rw-r--r--src/util/Cargo.toml7
-rw-r--r--src/util/config.rs105
-rw-r--r--src/util/encode.rs6
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/migrate.rs6
-rw-r--r--src/util/socket_address.rs44
6 files changed, 146 insertions, 23 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index f72051b9..6554ac13 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.8.2"
+version = "0.9.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -20,6 +20,7 @@ arc-swap = "1.0"
async-trait = "0.1"
blake2 = "0.10"
bytes = "1.0"
+bytesize = "1.2"
digest = "0.10"
err-derive = "0.3"
hexdump = "0.1"
@@ -31,7 +32,7 @@ rand = "0.8"
sha2 = "0.10"
chrono = "0.4"
-rmp-serde = "0.15"
+rmp-serde = "1.1.2"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
toml = "0.6"
@@ -39,7 +40,7 @@ toml = "0.6"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-netapp = "0.5"
+netapp = "0.10"
http = "0.2"
hyper = "0.14"
diff --git a/src/util/config.rs b/src/util/config.rs
index 1da95b2f..ad5c8e1f 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -1,4 +1,5 @@
//! Contains type and functions related to Garage configuration file
+use std::convert::TryFrom;
use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -6,6 +7,7 @@ use std::path::PathBuf;
use serde::{de, Deserialize};
use crate::error::Error;
+use crate::socket_address::UnixOrTCPSocketAddress;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)]
@@ -13,10 +15,20 @@ pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
- pub data_dir: PathBuf,
+ pub data_dir: DataDirEnum,
+
+ /// Whether to fsync after all metadata transactions (disabled by default)
+ #[serde(default)]
+ pub metadata_fsync: bool,
+ /// Whether to fsync after all data block writes (disabled by default)
+ #[serde(default)]
+ pub data_fsync: bool,
/// Size of data blocks to save to disk
- #[serde(default = "default_block_size")]
+ #[serde(
+ deserialize_with = "deserialize_capacity",
+ default = "default_block_size"
+ )]
pub block_size: usize,
/// Replication mode. Supported values:
@@ -66,12 +78,19 @@ pub struct Config {
pub db_engine: String,
/// Sled cache size, in bytes
- #[serde(default = "default_sled_cache_capacity")]
- pub sled_cache_capacity: u64,
+ #[serde(
+ deserialize_with = "deserialize_capacity",
+ default = "default_sled_cache_capacity"
+ )]
+ pub sled_cache_capacity: usize,
/// Sled flush interval in milliseconds
#[serde(default = "default_sled_flush_every_ms")]
pub sled_flush_every_ms: u64,
+ /// LMDB map size
+ #[serde(deserialize_with = "deserialize_capacity", default)]
+ pub lmdb_map_size: usize,
+
// -- APIs
/// Configuration for S3 api
pub s3_api: S3ApiConfig,
@@ -87,11 +106,31 @@ pub struct Config {
pub admin: AdminConfig,
}
+/// Value for data_dir: either a single directory or a list of dirs with attributes
+#[derive(Deserialize, Debug, Clone)]
+#[serde(untagged)]
+pub enum DataDirEnum {
+ Single(PathBuf),
+ Multiple(Vec<DataDir>),
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct DataDir {
+ /// Path to the data directory
+ pub path: PathBuf,
+ /// Capacity of the drive (required if read_only is false)
+ #[serde(default)]
+ pub capacity: Option<String>,
+ /// Whether this is a legacy read-only path (capacity should be None)
+ #[serde(default)]
+ pub read_only: bool,
+}
+
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct S3ApiConfig {
/// Address and port to bind for api serving
- pub api_bind_addr: Option<SocketAddr>,
+ pub api_bind_addr: Option<UnixOrTCPSocketAddress>,
/// S3 region to use
pub s3_region: String,
/// Suffix to remove from domain name to find bucket. If None,
@@ -103,14 +142,14 @@ pub struct S3ApiConfig {
#[derive(Deserialize, Debug, Clone)]
pub struct K2VApiConfig {
/// Address and port to bind for api serving
- pub api_bind_addr: SocketAddr,
+ pub api_bind_addr: UnixOrTCPSocketAddress,
}
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
/// Address and port to bind for web serving
- pub bind_addr: SocketAddr,
+ pub bind_addr: UnixOrTCPSocketAddress,
/// Suffix to remove from domain name to find bucket
pub root_domain: String,
}
@@ -119,7 +158,7 @@ pub struct WebConfig {
#[derive(Deserialize, Debug, Clone, Default)]
pub struct AdminConfig {
/// Address and port to bind for admin API serving
- pub api_bind_addr: Option<SocketAddr>,
+ pub api_bind_addr: Option<UnixOrTCPSocketAddress>,
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,
@@ -183,10 +222,10 @@ pub struct KubernetesDiscoveryConfig {
}
fn default_db_engine() -> String {
- "sled".into()
+ "lmdb".into()
}
-fn default_sled_cache_capacity() -> u64 {
+fn default_sled_cache_capacity() -> usize {
128 * 1024 * 1024
}
fn default_sled_flush_every_ms() -> u64 {
@@ -266,8 +305,6 @@ fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Er
where
D: de::Deserializer<'de>,
{
- use std::convert::TryFrom;
-
struct OptionVisitor;
impl<'de> serde::de::Visitor<'de> for OptionVisitor {
@@ -312,6 +349,50 @@ where
deserializer.deserialize_any(OptionVisitor)
}
+fn deserialize_capacity<'de, D>(deserializer: D) -> Result<usize, D::Error>
+where
+ D: de::Deserializer<'de>,
+{
+ struct CapacityVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for CapacityVisitor {
+ type Value = usize;
+ fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
+ formatter.write_str("int or '<capacity>'")
+ }
+
+ fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ value
+ .parse::<bytesize::ByteSize>()
+ .map(|x| x.as_u64())
+ .map_err(|e| E::custom(format!("invalid capacity value: {}", e)))
+ .and_then(|v| {
+ usize::try_from(v)
+ .map_err(|_| E::custom("capacity value out of bound".to_owned()))
+ })
+ }
+
+ fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ usize::try_from(v).map_err(|_| E::custom("capacity value out of bound".to_owned()))
+ }
+
+ fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ usize::try_from(v).map_err(|_| E::custom("capacity value out of bound".to_owned()))
+ }
+ }
+
+ deserializer.deserialize_any(CapacityVisitor)
+}
+
#[cfg(test)]
mod tests {
use crate::error::Error;
diff --git a/src/util/encode.rs b/src/util/encode.rs
index 1cd3198f..a9ab9a35 100644
--- a/src/util/encode.rs
+++ b/src/util/encode.rs
@@ -8,9 +8,7 @@ where
T: Serialize + ?Sized,
{
let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
+ let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
val.serialize(&mut se)?;
Ok(wr)
}
@@ -22,7 +20,7 @@ pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Erro
where
T: for<'de> Deserialize<'de> + ?Sized,
{
- rmp_serde::decode::from_read_ref::<_, T>(bytes)
+ rmp_serde::decode::from_slice::<_>(bytes)
}
/// Serialize to JSON, truncating long result
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 15f0f829..7df77959 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -14,6 +14,7 @@ pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
pub mod persister;
+pub mod socket_address;
pub mod time;
pub mod tranquilizer;
pub mod version;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
index b9cce08a..45147c74 100644
--- a/src/util/migrate.rs
+++ b/src/util/migrate.rs
@@ -19,7 +19,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
fn decode(bytes: &[u8]) -> Option<Self> {
let marker_len = Self::VERSION_MARKER.len();
if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
- if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ if let Ok(value) = rmp_serde::decode::from_slice::<_>(&bytes[marker_len..]) {
return Some(value);
}
}
@@ -31,9 +31,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut wr = Vec::with_capacity(128);
wr.extend_from_slice(Self::VERSION_MARKER);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
+ let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map();
self.serialize(&mut se)?;
Ok(wr)
}
diff --git a/src/util/socket_address.rs b/src/util/socket_address.rs
new file mode 100644
index 00000000..f01225f6
--- /dev/null
+++ b/src/util/socket_address.rs
@@ -0,0 +1,44 @@
+use std::fmt::{Debug, Display, Formatter};
+use std::net::SocketAddr;
+use std::path::PathBuf;
+use std::str::FromStr;
+
+use serde::de::Error;
+use serde::{Deserialize, Deserializer};
+
+#[derive(Debug, Clone)]
+pub enum UnixOrTCPSocketAddress {
+ TCPSocket(SocketAddr),
+ UnixSocket(PathBuf),
+}
+
+impl Display for UnixOrTCPSocketAddress {
+ fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address),
+ UnixOrTCPSocketAddress::UnixSocket(path) => {
+ write!(formatter, "http+unix://{}", path.to_string_lossy())
+ }
+ }
+ }
+}
+
+impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ let string = String::deserialize(deserializer)?;
+ let string = string.as_str();
+
+ if string.starts_with("/") {
+ Ok(UnixOrTCPSocketAddress::UnixSocket(
+ PathBuf::from_str(string).map_err(Error::custom)?,
+ ))
+ } else {
+ Ok(UnixOrTCPSocketAddress::TCPSocket(
+ SocketAddr::from_str(string).map_err(Error::custom)?,
+ ))
+ }
+ }
+}