diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/Cargo.toml | 7 | ||||
-rw-r--r-- | src/util/config.rs | 105 | ||||
-rw-r--r-- | src/util/encode.rs | 6 | ||||
-rw-r--r-- | src/util/lib.rs | 1 | ||||
-rw-r--r-- | src/util/migrate.rs | 6 | ||||
-rw-r--r-- | src/util/socket_address.rs | 44 |
6 files changed, 146 insertions, 23 deletions
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index f72051b9..6554ac13 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.8.2" +version = "0.9.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -20,6 +20,7 @@ arc-swap = "1.0" async-trait = "0.1" blake2 = "0.10" bytes = "1.0" +bytesize = "1.2" digest = "0.10" err-derive = "0.3" hexdump = "0.1" @@ -31,7 +32,7 @@ rand = "0.8" sha2 = "0.10" chrono = "0.4" -rmp-serde = "0.15" +rmp-serde = "1.1.2" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = "1.0" toml = "0.6" @@ -39,7 +40,7 @@ toml = "0.6" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -netapp = "0.5" +netapp = "0.10" http = "0.2" hyper = "0.14" diff --git a/src/util/config.rs b/src/util/config.rs index 1da95b2f..ad5c8e1f 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -1,4 +1,5 @@ //! Contains type and functions related to Garage configuration file +use std::convert::TryFrom; use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; @@ -6,6 +7,7 @@ use std::path::PathBuf; use serde::{de, Deserialize}; use crate::error::Error; +use crate::socket_address::UnixOrTCPSocketAddress; /// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] @@ -13,10 +15,20 @@ pub struct Config { /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, /// Path where to store data. Can be slower, but need higher volume - pub data_dir: PathBuf, + pub data_dir: DataDirEnum, + + /// Whether to fsync after all metadata transactions (disabled by default) + #[serde(default)] + pub metadata_fsync: bool, + /// Whether to fsync after all data block writes (disabled by default) + #[serde(default)] + pub data_fsync: bool, /// Size of data blocks to save to disk - #[serde(default = "default_block_size")] + #[serde( + deserialize_with = "deserialize_capacity", + default = "default_block_size" + )] pub block_size: usize, /// Replication mode. Supported values: @@ -66,12 +78,19 @@ pub struct Config { pub db_engine: String, /// Sled cache size, in bytes - #[serde(default = "default_sled_cache_capacity")] - pub sled_cache_capacity: u64, + #[serde( + deserialize_with = "deserialize_capacity", + default = "default_sled_cache_capacity" + )] + pub sled_cache_capacity: usize, /// Sled flush interval in milliseconds #[serde(default = "default_sled_flush_every_ms")] pub sled_flush_every_ms: u64, + /// LMDB map size + #[serde(deserialize_with = "deserialize_capacity", default)] + pub lmdb_map_size: usize, + // -- APIs /// Configuration for S3 api pub s3_api: S3ApiConfig, @@ -87,11 +106,31 @@ pub struct Config { pub admin: AdminConfig, } +/// Value for data_dir: either a single directory or a list of dirs with attributes +#[derive(Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum DataDirEnum { + Single(PathBuf), + Multiple(Vec<DataDir>), +} + +#[derive(Deserialize, Debug, Clone)] +pub struct DataDir { + /// Path to the data directory + pub path: PathBuf, + /// Capacity of the drive (required if read_only is false) + #[serde(default)] + pub capacity: Option<String>, + /// Whether this is a legacy read-only path (capacity should be None) + #[serde(default)] + pub read_only: bool, +} + /// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct S3ApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: Option<SocketAddr>, + pub api_bind_addr: Option<UnixOrTCPSocketAddress>, /// S3 region to use pub s3_region: String, /// Suffix to remove from domain name to find bucket. If None, @@ -103,14 +142,14 @@ pub struct S3ApiConfig { #[derive(Deserialize, Debug, Clone)] pub struct K2VApiConfig { /// Address and port to bind for api serving - pub api_bind_addr: SocketAddr, + pub api_bind_addr: UnixOrTCPSocketAddress, } /// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { /// Address and port to bind for web serving - pub bind_addr: SocketAddr, + pub bind_addr: UnixOrTCPSocketAddress, /// Suffix to remove from domain name to find bucket pub root_domain: String, } @@ -119,7 +158,7 @@ pub struct WebConfig { #[derive(Deserialize, Debug, Clone, Default)] pub struct AdminConfig { /// Address and port to bind for admin API serving - pub api_bind_addr: Option<SocketAddr>, + pub api_bind_addr: Option<UnixOrTCPSocketAddress>, /// Bearer token to use to scrape metrics pub metrics_token: Option<String>, @@ -183,10 +222,10 @@ pub struct KubernetesDiscoveryConfig { } fn default_db_engine() -> String { - "sled".into() + "lmdb".into() } -fn default_sled_cache_capacity() -> u64 { +fn default_sled_cache_capacity() -> usize { 128 * 1024 * 1024 } fn default_sled_flush_every_ms() -> u64 { @@ -266,8 +305,6 @@ fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Er where D: de::Deserializer<'de>, { - use std::convert::TryFrom; - struct OptionVisitor; impl<'de> serde::de::Visitor<'de> for OptionVisitor { @@ -312,6 +349,50 @@ where deserializer.deserialize_any(OptionVisitor) } +fn deserialize_capacity<'de, D>(deserializer: D) -> Result<usize, D::Error> +where + D: de::Deserializer<'de>, +{ + struct CapacityVisitor; + + impl<'de> serde::de::Visitor<'de> for CapacityVisitor { + type Value = usize; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("int or '<capacity>'") + } + + fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> + where + E: de::Error, + { + value + .parse::<bytesize::ByteSize>() + .map(|x| x.as_u64()) + .map_err(|e| E::custom(format!("invalid capacity value: {}", e))) + .and_then(|v| { + usize::try_from(v) + .map_err(|_| E::custom("capacity value out of bound".to_owned())) + }) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: de::Error, + { + usize::try_from(v).map_err(|_| E::custom("capacity value out of bound".to_owned())) + } + + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: de::Error, + { + usize::try_from(v).map_err(|_| E::custom("capacity value out of bound".to_owned())) + } + } + + deserializer.deserialize_any(CapacityVisitor) +} + #[cfg(test)] mod tests { use crate::error::Error; diff --git a/src/util/encode.rs b/src/util/encode.rs index 1cd3198f..a9ab9a35 100644 --- a/src/util/encode.rs +++ b/src/util/encode.rs @@ -8,9 +8,7 @@ where T: Serialize + ?Sized, { let mut wr = Vec::with_capacity(128); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); + let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map(); val.serialize(&mut se)?; Ok(wr) } @@ -22,7 +20,7 @@ pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Erro where T: for<'de> Deserialize<'de> + ?Sized, { - rmp_serde::decode::from_read_ref::<_, T>(bytes) + rmp_serde::decode::from_slice::<_>(bytes) } /// Serialize to JSON, truncating long result diff --git a/src/util/lib.rs b/src/util/lib.rs index 15f0f829..7df77959 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -14,6 +14,7 @@ pub mod forwarded_headers; pub mod metrics; pub mod migrate; pub mod persister; +pub mod socket_address; pub mod time; pub mod tranquilizer; pub mod version; diff --git a/src/util/migrate.rs b/src/util/migrate.rs index b9cce08a..45147c74 100644 --- a/src/util/migrate.rs +++ b/src/util/migrate.rs @@ -19,7 +19,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn decode(bytes: &[u8]) -> Option<Self> { let marker_len = Self::VERSION_MARKER.len(); if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) { - if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) { + if let Ok(value) = rmp_serde::decode::from_slice::<_>(&bytes[marker_len..]) { return Some(value); } } @@ -31,9 +31,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static { fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> { let mut wr = Vec::with_capacity(128); wr.extend_from_slice(Self::VERSION_MARKER); - let mut se = rmp_serde::Serializer::new(&mut wr) - .with_struct_map() - .with_string_variants(); + let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map(); self.serialize(&mut se)?; Ok(wr) } diff --git a/src/util/socket_address.rs b/src/util/socket_address.rs new file mode 100644 index 00000000..f01225f6 --- /dev/null +++ b/src/util/socket_address.rs @@ -0,0 +1,44 @@ +use std::fmt::{Debug, Display, Formatter}; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; + +use serde::de::Error; +use serde::{Deserialize, Deserializer}; + +#[derive(Debug, Clone)] +pub enum UnixOrTCPSocketAddress { + TCPSocket(SocketAddr), + UnixSocket(PathBuf), +} + +impl Display for UnixOrTCPSocketAddress { + fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result { + match self { + UnixOrTCPSocketAddress::TCPSocket(address) => write!(formatter, "http://{}", address), + UnixOrTCPSocketAddress::UnixSocket(path) => { + write!(formatter, "http+unix://{}", path.to_string_lossy()) + } + } + } +} + +impl<'de> Deserialize<'de> for UnixOrTCPSocketAddress { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + let string = String::deserialize(deserializer)?; + let string = string.as_str(); + + if string.starts_with("/") { + Ok(UnixOrTCPSocketAddress::UnixSocket( + PathBuf::from_str(string).map_err(Error::custom)?, + )) + } else { + Ok(UnixOrTCPSocketAddress::TCPSocket( + SocketAddr::from_str(string).map_err(Error::custom)?, + )) + } + } +} |