diff options
Diffstat (limited to 'src/util')
-rw-r--r-- | src/util/background.rs | 10 | ||||
-rw-r--r-- | src/util/config.rs | 27 | ||||
-rw-r--r-- | src/util/data.rs | 18 | ||||
-rw-r--r-- | src/util/error.rs | 3 | ||||
-rw-r--r-- | src/util/lib.rs | 2 | ||||
-rw-r--r-- | src/util/time.rs | 4 |
6 files changed, 61 insertions, 3 deletions
diff --git a/src/util/background.rs b/src/util/background.rs index b5eb8bc8..bfdaaf1e 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,3 +1,4 @@ +//! Job runner for futures and async functions use core::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -12,14 +13,15 @@ use crate::error::Error; type JobOutput = Result<(), Error>; type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; +/// Job runner for futures and async functions pub struct BackgroundRunner { - pub stop_signal: watch::Receiver<bool>, - + stop_signal: watch::Receiver<bool>, queue_in: mpsc::UnboundedSender<(Job, bool)>, worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, } impl BackgroundRunner { + /// Create a new BackgroundRunner pub fn new( n_runners: usize, stop_signal: watch::Receiver<bool>, @@ -103,7 +105,7 @@ impl BackgroundRunner { (bgrunner, await_all_done) } - // Spawn a task to be run in background + /// Spawn a task to be run in background pub fn spawn<T>(&self, job: T) where T: Future<Output = JobOutput> + Send + 'static, @@ -115,6 +117,8 @@ impl BackgroundRunner { .unwrap(); } + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping pub fn spawn_cancellable<T>(&self, job: T) where T: Future<Output = JobOutput> + Send + 'static, diff --git a/src/util/config.rs b/src/util/config.rs index 9ff67711..bb70467b 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -1,3 +1,4 @@ +//! Contains type and functions related to Garage configuration file use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; @@ -6,57 +7,82 @@ use serde::{de, Deserialize}; use crate::error::Error; +/// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] pub struct Config { + /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, + /// Path where to store data. Can be slower, but need higher volume pub data_dir: PathBuf, + /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, + /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec<SocketAddr>, + /// Consule host to connect to to discover more peers pub consul_host: Option<String>, + /// Consul service name to use pub consul_service_name: Option<String>, + /// Max number of concurrent RPC request #[serde(default = "default_max_concurrent_rpc_requests")] pub max_concurrent_rpc_requests: usize, + /// Size of data blocks to save to disk #[serde(default = "default_block_size")] pub block_size: usize, #[serde(default = "default_control_write_max_faults")] pub control_write_max_faults: usize, + /// How many nodes should hold a copy of meta data #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, + /// How many nodes should hold a copy of data #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Configuration for RPC TLS pub rpc_tls: Option<TlsConfig>, + /// Configuration for S3 api pub s3_api: ApiConfig, + /// Configuration for serving files as normal web server pub s3_web: WebConfig, } +/// Configuration for RPC TLS #[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { + /// Path to certificate autority used for all nodes pub ca_cert: String, + /// Path to public certificate for this node pub node_cert: String, + /// Path to private key for this node pub node_key: String, } +/// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct ApiConfig { + /// Address and port to bind for api serving pub api_bind_addr: SocketAddr, + /// S3 region to use pub s3_region: String, } +/// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { + /// Address and port to bind for web serving pub bind_addr: SocketAddr, + /// Suffix to remove from domain name to find bucket pub root_domain: String, + /// Suffix to add when user-agent request path end with "/" pub index: String, } @@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize { 1 } +/// Read and parse configuration pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/util/data.rs b/src/util/data.rs index 8cd6dd96..34ee8a18 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -1,8 +1,10 @@ +//! Contains common types and functions related to serialization and integrity use rand::Rng; use serde::de::{self, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; +/// An array of 32 bytes #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] pub struct FixedBytes32([u8; 32]); @@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 { } impl FixedBytes32 { + /// Access the content as a slice pub fn as_slice(&self) -> &[u8] { &self.0[..] } + /// Access the content as a mutable slice pub fn as_slice_mut(&mut self) -> &mut [u8] { &mut self.0[..] } + /// Copy to a slice pub fn to_vec(&self) -> Vec<u8> { self.0.to_vec() } + /// Try building a FixedBytes32 from a slice + /// Return None if the slice is not 32 bytes long pub fn try_from(by: &[u8]) -> Option<Self> { if by.len() != 32 { return None; @@ -80,9 +87,12 @@ impl FixedBytes32 { } } +/// A 32 bytes UUID pub type UUID = FixedBytes32; +/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance pub type Hash = FixedBytes32; +/// Compute the sha256 of a slice pub fn sha256sum(data: &[u8]) -> Hash { use sha2::{Digest, Sha256}; @@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash { hash.into() } +/// Compute the blake2 of a slice pub fn blake2sum(data: &[u8]) -> Hash { use blake2::{Blake2b, Digest}; @@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash { hash.into() } +/// A 64 bit non cryptographic hash pub type FastHash = u64; +/// Compute a (non cryptographic) of a slice pub fn fasthash(data: &[u8]) -> FastHash { use xxhash_rust::xxh3::Xxh3; @@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash { h.digest() } +/// Generate a random 32 bytes UUID pub fn gen_uuid() -> UUID { rand::thread_rng().gen::<[u8; 32]>().into() } // RMP serialization with names of fields and variants +/// Serialize to MessagePack pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> where T: Serialize + ?Sized, @@ -131,10 +146,13 @@ where Ok(wr) } +/// Serialize to JSON, truncating long result pub fn debug_serialize<T: Serialize>(x: T) -> String { match serde_json::to_string(&x) { Ok(ss) => { if ss.len() > 100 { + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint ss[..100].to_string() } else { ss diff --git a/src/util/error.rs b/src/util/error.rs index a9bf0824..32dccbe6 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -1,9 +1,11 @@ +//! Module containing error types used in Garage use err_derive::Error; use hyper::StatusCode; use std::io; use crate::data::*; +/// RPC related errors #[derive(Debug, Error)] pub enum RPCError { #[error(display = "Node is down: {:?}.", _0)] @@ -28,6 +30,7 @@ pub enum RPCError { TooManyErrors(Vec<String>), } +/// Regroup all Garage errors #[derive(Debug, Error)] pub enum Error { #[error(display = "IO error: {}", _0)] diff --git a/src/util/lib.rs b/src/util/lib.rs index 055e9ab0..c080e3a3 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -1,3 +1,5 @@ +//! Crate containing common functions and types used in Garage + #[macro_use] extern crate log; diff --git a/src/util/time.rs b/src/util/time.rs index 148860e0..dfedcb26 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -1,6 +1,8 @@ +//! Module containing helper functions to manipulate time use chrono::{SecondsFormat, TimeZone, Utc}; use std::time::{SystemTime, UNIX_EPOCH}; +/// Returns milliseconds since UNIX Epoch pub fn now_msec() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -8,6 +10,8 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +/// Convert a timestamp represented as milliseconds since UNIX Epoch to +/// its RFC3339 representation, such as "2021-01-01T12:30:00Z" pub fn msec_to_rfc3339(msecs: u64) -> String { let secs = msecs as i64 / 1000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; |