diff options
38 files changed, 393 insertions, 79 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 2feb0e3a..dcc9f478 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -20,6 +20,7 @@ use crate::s3_get::*; use crate::s3_list::*; use crate::s3_put::*; +/// Run the S3 API server pub async fn run_api_server( garage: Arc<Garage>, shutdown_signal: impl Future<Output = ()>, diff --git a/src/api/encoding.rs b/src/api/encoding.rs index 25999207..63c5dee2 100644 --- a/src/api/encoding.rs +++ b/src/api/encoding.rs @@ -1,9 +1,13 @@ +//! Module containing various helpers for encoding + +/// Escape &str for xml inclusion pub fn xml_escape(s: &str) -> String { s.replace("<", "<") .replace(">", ">") .replace("\"", """) } +/// Encode &str for use in a URI pub fn uri_encode(string: &str, encode_slash: bool) -> String { let mut result = String::with_capacity(string.len() * 2); for c in string.chars() { @@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String { result } +/// Encode &str either as an uri, or a valid string for xml inclusion pub fn xml_encode_key(k: &str, urlencode: bool) -> String { if urlencode { uri_encode(k, true) diff --git a/src/api/error.rs b/src/api/error.rs index 42a7ab10..ad0174ad 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -3,44 +3,57 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +/// Errors of this crate #[derive(Debug, Error)] pub enum Error { // Category: internal error + /// Error related to deeper parts of Garage #[error(display = "Internal error: {}", _0)] InternalError(#[error(source)] GarageError), + /// Error related to Hyper #[error(display = "Internal error (Hyper error): {}", _0)] Hyper(#[error(source)] hyper::Error), + /// Error related to HTTP #[error(display = "Internal error (HTTP error): {}", _0)] HTTP(#[error(source)] http::Error), // Category: cannot process + /// No proper api key was used, or the signature was invalid #[error(display = "Forbidden: {}", _0)] Forbidden(String), + /// The object requested don't exists #[error(display = "Not found")] NotFound, // Category: bad request + /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8Str(#[error(source)] std::str::Utf8Error), + /// The request used an invalid path #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8String(#[error(source)] std::string::FromUtf8Error), + /// Some base64 encoded data was badly encoded #[error(display = "Invalid base64: {}", _0)] InvalidBase64(#[error(source)] base64::DecodeError), + /// The client sent invalid XML data #[error(display = "Invalid XML: {}", _0)] InvalidXML(String), + /// The client sent a header with invalid value #[error(display = "Invalid header value: {}", _0)] InvalidHeader(#[error(source)] hyper::header::ToStrError), + /// The client sent a range header with invalid value #[error(display = "Invalid HTTP range: {:?}", _0)] InvalidRange(#[error(from)] http_range::HttpRangeParseError), + /// The client sent an invalid request #[error(display = "Bad request: {}", _0)] BadRequest(String), } @@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error { } impl Error { + /// Get the HTTP status code that best represents the meaning of the error for the client pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, @@ -65,6 +79,7 @@ impl Error { } } +/// Trait to map error to the Bad Request error code pub trait OkOrBadRequest { type S2; fn ok_or_bad_request(self, reason: &'static str) -> Self::S2; @@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> { } } +/// Trait to map an error to an Internal Error code pub trait OkOrInternalError { type S2; fn ok_or_internal_error(self, reason: &'static str) -> Self::S2; diff --git a/src/api/lib.rs b/src/api/lib.rs index 9bb07925..be7e37c8 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -1,15 +1,19 @@ +//! Crate for serving a S3 compatible API #[macro_use] extern crate log; -pub mod error; +mod error; +pub use error::Error; -pub mod encoding; +mod encoding; -pub mod api_server; -pub mod signature; +mod api_server; +pub use api_server::run_api_server; -pub mod s3_copy; -pub mod s3_delete; +mod signature; + +mod s3_copy; +mod s3_delete; pub mod s3_get; -pub mod s3_list; -pub mod s3_put; +mod s3_list; +mod s3_put; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 2590c9bd..15c0ed0a 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -1,3 +1,4 @@ +//! Function related to GET and HEAD requests use std::sync::Arc; use std::time::{Duration, UNIX_EPOCH}; @@ -79,6 +80,7 @@ fn try_answer_cached( } } +/// Handle HEAD request pub async fn handle_head( garage: Arc<Garage>, req: &Request<Body>, @@ -118,6 +120,7 @@ pub async fn handle_head( Ok(response) } +/// Handle GET request pub async fn handle_get( garage: Arc<Garage>, req: &Request<Body>, @@ -224,7 +227,7 @@ pub async fn handle_get( } } -pub async fn handle_get_range( +async fn handle_get_range( garage: Arc<Garage>, version: &ObjectVersion, version_data: &ObjectVersionData, diff --git a/src/garage/cli.rs b/src/garage/cli.rs index eb8275a9..55cd222b 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt { #[structopt(short = "c", long = "capacity")] capacity: Option<u32>, - /// Optionnal node tag + /// Optional node tag #[structopt(short = "t", long = "tag")] tag: Option<String>, diff --git a/src/garage/main.rs b/src/garage/main.rs index 6c86d0fb..a78e0f03 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -1,4 +1,5 @@ #![recursion_limit = "1024"] +//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance #[macro_use] extern crate log; @@ -25,7 +26,7 @@ use cli::*; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] -pub struct Opt { +struct Opt { /// RPC connect to this host to execute client operations #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] pub rpc_host: SocketAddr, diff --git a/src/garage/server.rs b/src/garage/server.rs index feb858e4..97a9bec2 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -8,10 +8,10 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_api::api_server; +use garage_api::run_api_server; use garage_model::garage::Garage; use garage_rpc::rpc_server::RpcServer; -use garage_web::web_server; +use garage_web::run_web_server; use crate::admin_rpc::*; @@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); - let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); + let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone())); + let web_server = run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( bootstrap.map(|rv| { diff --git a/src/model/block.rs b/src/model/block.rs index 0d9af38f..5f428fe1 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -18,12 +18,13 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; +use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec<u8>, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc<Self>) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,7 +156,8 @@ impl BlockManager { } } - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + /// Write a block to disk + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -159,7 +175,8 @@ impl BlockManager { Ok(Message::Ok) } - pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + /// Read block from disk, verifying it's integrity + async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -190,7 +207,8 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } - pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { + /// Check if this node should have a block, but don't actually have it + async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { let needed = self .rc .get(hash.as_ref())? @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,7 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +409,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +434,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +521,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index e4372717..1f0c7bb0 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -10,13 +10,14 @@ use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { - // Primary key + /// Hash of the block, used as partition key pub block: Hash, - // Sort key + /// Id of the Version for the object containing this block, used as sorting key pub version: UUID, // Keep track of deleted status + /// Is the Version that contains this block deleted pub deleted: crdt::Bool, } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 6330dced..6a4b021d 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -12,15 +12,18 @@ use crate::key_table::PermissionSet; /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { - // Primary key + /// Name of the bucket pub name: String, - + /// State, and configuration if not deleted, of the bucket pub state: crdt::LWW<BucketState>, } +/// State of a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum BucketState { + /// The bucket is deleted Deleted, + /// The bucket exists Present(BucketParams), } @@ -37,9 +40,12 @@ impl CRDT for BucketState { } } +/// Configuration for a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give pub authorized_keys: crdt::LWWMap<String, PermissionSet>, + /// Is the bucket served as http pub website: crdt::LWW<bool>, } @@ -51,6 +57,7 @@ impl CRDT for BucketParams { } impl BucketParams { + /// Initializes a new instance of the Bucket struct pub fn new() -> Self { BucketParams { authorized_keys: crdt::LWWMap::new(), @@ -60,15 +67,21 @@ impl BucketParams { } impl Bucket { + /// Create a new bucket pub fn new(name: String) -> Self { Bucket { name, state: crdt::LWW::new(BucketState::Present(BucketParams::new())), } } + + /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted } + + /// Return the list of authorized keys, when each was updated, and the permission associated to + /// the key pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { match self.state.get() { BucketState::Deleted => &[], diff --git a/src/model/garage.rs b/src/model/garage.rs index 5f7a67c9..797a91e5 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::replication::fullcopy::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableFullReplication; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block::*; @@ -18,15 +18,23 @@ use crate::key_table::*; use crate::object_table::*; use crate::version_table::*; +/// An entire Garage full of data pub struct Garage { + /// The parsed configuration Garage is running pub config: Config, + /// The local database pub db: sled::Db, + /// A background job runner pub background: Arc<BackgroundRunner>, + /// The membership manager pub system: Arc<System>, + /// The block manager pub block_manager: Arc<BlockManager>, + /// Table containing informations about buckets pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, + /// Table containing informations about api keys pub key_table: Arc<Table<KeyTable, TableFullReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, @@ -35,6 +43,7 @@ pub struct Garage { } impl Garage { + /// Create and run garage pub fn new( config: Config, db: sled::Db, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index fcca3835..578f8683 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; +/// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { - // Primary key + /// The id of the key (immutable), used as partition key pub key_id: String, - // Associated secret key (immutable) + /// The secret_key associated pub secret_key: String, - // Name + /// Name for the key pub name: crdt::LWW<String>, - // Deletion + /// Is the key deleted pub deleted: crdt::Bool, - // Authorized keys - pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, + /// Buckets in which the key is authorized. Empty if `Key` is deleted // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, } impl Key { + /// Create a new key pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); @@ -34,6 +36,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Import a key from it's parts pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { Self { key_id: key_id.to_string(), @@ -43,6 +47,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Create a new Key which can me merged to mark an existing key deleted pub fn delete(key_id: String) -> Self { Self { key_id, @@ -52,13 +58,16 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } - /// Add an authorized bucket, only if it wasn't there before + + /// Check if `Key` is allowed to read in bucket pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } + + /// Check if `Key` is allowed to write in bucket pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) @@ -67,9 +76,12 @@ impl Key { } } +/// Permission given to a key in a bucket #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct PermissionSet { + /// The key can be used to read the bucket pub allow_read: bool, + /// The key can be used to write in the bucket pub allow_write: bool, } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 34ac798a..ff42d065 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::version_table::*; +/// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { - // Primary key + /// The bucket in which the object is stored, used as partition key pub bucket: String, - // Sort key + /// The key at which the object is stored in its bucket, used as sorting key pub key: String, - // Data + /// The list of currenty stored versions of the object versions: Vec<ObjectVersion>, } impl Object { + /// Create an object from parts pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { let mut ret = Self { bucket, @@ -36,6 +38,7 @@ impl Object { } ret } + /// Adds a version if it wasn't already present pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { match self @@ -49,23 +52,32 @@ impl Object { Ok(_) => Err(()), } } + + /// Get a list of currently stored versions of `Object` pub fn versions(&self) -> &[ObjectVersion] { &self.versions[..] } } +/// Informations about a version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { + /// Id of the version pub uuid: UUID, + /// Timestamp of when the object was created pub timestamp: u64, - + /// State of the version pub state: ObjectVersionState, } +/// State of an object version #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { + /// The version is being received Uploading(ObjectVersionHeaders), + /// The version is fully received Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted Aborted, } @@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState { } } +/// Data about an object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such DeleteMarker, + /// The object is short, it's stored inlined Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table FirstBlock(ObjectVersionMeta, Hash), } @@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } +/// Metadata about the object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { + /// Headers to send to the client pub headers: ObjectVersionHeaders, + /// Size of the object pub size: u64, + /// etag of the object pub etag: String, } +/// Additional headers for an object #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { + /// Content type of the object pub content_type: String, + /// Any other http headers to send pub other: BTreeMap<String, String>, } @@ -118,18 +142,24 @@ impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) } + + /// Is the object version currently being uploaded pub fn is_uploading(&self) -> bool { match self.state { ObjectVersionState::Uploading(_) => true, _ => false, } } + + /// Is the object version completely received pub fn is_complete(&self) -> bool { match self.state { ObjectVersionState::Complete(_) => true, _ => false, } } + + /// Is the object version available (received and not a tombstone) pub fn is_data(&self) -> bool { match self.state { ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..bb836868 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block_ref_table::*; +/// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { - // Primary key + /// UUID of the version, used as partition key pub uuid: UUID, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted pub deleted: crdt::Bool, + /// list of blocks of data composing the version pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on + /// Bucket in which the related object is stored pub bucket: String, + /// Key in which the related object is stored pub key: String, } @@ -43,7 +49,9 @@ impl Version { #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { + /// Number of the part pub part_number: u64, + /// Offset of this sub-segment in its part pub offset: u64, } @@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey { } } +/// Informations about a single block #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { + /// Hash of the block pub hash: Hash, + /// Size of the block pub size: u64, } diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 00e31f57..96561d0e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,7 +1,9 @@ +//! Crate containing rpc related functions and types used in Garage + #[macro_use] extern crate log; -pub mod consul; +mod consul; pub(crate) mod tls_util; pub mod membership; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 9fb24ad4..5f7bbc96 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,3 +1,4 @@ +//! Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; +/// RPC endpoint used for calls related to membership pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; +/// RPC messages related to membership #[derive(Debug, Serialize, Deserialize)] pub enum Message { + /// Response to successfull advertisements Ok, + /// Message sent to detect other nodes status Ping(PingMessage), + /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp PullStatus, + /// Ask other node its config. Answered with AdvertiseConfig PullConfig, + /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus AdvertiseNodesUp(Vec<AdvertisedNode>), + /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), } impl RpcMessage for Message {} +/// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { id: UUID, @@ -55,18 +65,25 @@ pub struct PingMessage { state_info: StateInfo, } +/// A node advertisement #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { + /// Id of the node this advertisement relates to pub id: UUID, + /// IP and port of the node pub addr: SocketAddr, + /// Is the node considered up pub is_up: bool, + /// When was the node last seen up, in milliseconds since UNIX epoch pub last_seen: u64, pub state_info: StateInfo, } +/// This node's membership manager pub struct System { + /// The id of this node pub id: UUID, persist_config: Persister<NetworkConfig>, @@ -79,10 +96,12 @@ pub struct System { rpc_client: Arc<RpcClient<Message>>, pub(crate) status: watch::Receiver<Arc<Status>>, + /// The ring pub ring: watch::Receiver<Arc<Ring>>, update_lock: Mutex<Updaters>, + /// The job runner of this node pub background: Arc<BackgroundRunner>, } @@ -91,21 +110,29 @@ struct Updaters { update_ring: watch::Sender<Arc<Ring>>, } +/// The status of each nodes, viewed by this node #[derive(Debug, Clone)] pub struct Status { + /// Mapping of each node id to its known status pub nodes: HashMap<UUID, Arc<StatusEntry>>, + /// Hash of `nodes`, used to detect when nodes have different views of the cluster pub hash: Hash, } +/// The status of a single node #[derive(Debug)] pub struct StatusEntry { + /// The IP and port used to connect to this node pub addr: SocketAddr, + /// Last time this node was seen pub last_seen: u64, + /// Number of consecutive pings sent without reply to this node pub num_failures: AtomicUsize, pub state_info: StateInfo, } impl StatusEntry { + /// is the node associated to this entry considered up pub fn is_up(&self) -> bool { self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN } @@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { } impl System { + /// Create this node's membership manager pub fn new( metadata_dir: PathBuf, rpc_http_client: Arc<RpcHttpClient>, @@ -279,6 +307,7 @@ impl System { }); } + /// Get an RPC client pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { RpcClient::new( RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), @@ -287,6 +316,7 @@ impl System { ) } + /// Save network configuration to disc async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let ring = self.ring.borrow().clone(); self.persist_config @@ -319,6 +349,7 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } + /// Perform bootstraping, starting the ping loop pub async fn bootstrap( self: Arc<Self>, peers: Vec<SocketAddr>, @@ -386,6 +417,8 @@ impl System { } } else if let Some(id) = id_option { if let Some(st) = status.nodes.get_mut(id) { + // we need to increment failure counter as call was done using by_addr so the + // counter was not auto-incremented st.num_failures.fetch_add(1, Ordering::SeqCst); if !st.is_up() { warn!("Node {:?} seems to be down.", id); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 2e997523..bffd7f1f 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,3 +1,5 @@ +//! Module containing types related to computing nodes which should receive a copy of data blocks +//! and metadata use std::collections::{HashMap, HashSet}; use std::convert::TryInto; @@ -8,23 +10,30 @@ use garage_util::data::*; // A partition number is encoded on 16 bits, // i.e. we have up to 2**16 partitions. // (in practice we have exactly 2**PARTITION_BITS partitions) +/// A partition id, stored on 16 bits pub type Partition = u16; // TODO: make this constant parametrizable in the config file // For deployments with many nodes it might make sense to bump // it up to 10. // Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); // TODO: make this constant paraetrizable in the config file // (most deployments use a replication factor of 3, so...) +/// The maximum number of time an object might get replicated pub const MAX_REPLICATION: usize = 3; +/// The user-defined configuration of the cluster's nodes #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { + /// Map of each node's id to it's configuration pub members: HashMap<UUID, NetworkConfigEntry>, + /// Version of this config pub version: u64, } @@ -37,26 +46,40 @@ impl NetworkConfig { } } +/// The overall configuration of one (possibly remote) node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfigEntry { + /// Datacenter at which this entry belong. This infromation might be used to perform a better + /// geodistribution pub datacenter: String, + /// The (relative) capacity of the node pub capacity: u32, + /// A tag to recognize the entry, not used for other things than display pub tag: String, } +/// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The network configuration used to generate this ring pub config: NetworkConfig, + /// The list of entries in the ring pub ring: Vec<RingEntry>, } +/// An entry in the ring #[derive(Clone, Debug)] pub struct RingEntry { + /// The prefix of the Hash of object which should use this entry pub location: Hash, + /// The nodes in which a matching object should get stored pub nodes: [UUID; MAX_REPLICATION], } impl Ring { + // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 + // levels of imbrication. It is basically impossible to test, maintain, or understand for an + // outsider. pub(crate) fn new(config: NetworkConfig) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); @@ -166,20 +189,16 @@ impl Ring { }) .collect::<Vec<_>>(); - // eprintln!("RING: --"); - // for e in ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - Self { config, ring } } + /// Get the partition in which data would fall on pub fn partition_of(&self, from: &Hash) -> Partition { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } + /// Get the list of partitions and the first hash of a partition key that would fall in it pub fn partitions(&self) -> Vec<(Partition, Hash)> { let mut ret = vec![]; @@ -193,6 +212,8 @@ impl Ring { ret } + // TODO rename this function as it no longer walk the ring + /// Walk the ring to find the n servers in which data should be replicated pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); @@ -201,12 +222,15 @@ impl Ring { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + // TODO why computing two time in the same way and asserting? assert_eq!(partition_idx, self.partition_of(from) as usize); let partition = &self.ring[partition_idx]; let partition_top = u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); + // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should + // probably be a test more than a runtime assertion assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert!(n <= partition.nodes.len()); diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 5e76e215..8a6cc721 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,25 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached + /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn<M> = Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> { } impl<M: RpcMessage + 'static> RpcClient<M> { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient<M>, background: Arc<BackgroundRunner>, @@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) where F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, @@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } + /// Get a RPC client to make calls using node's SocketAddr instead of its ID pub fn by_addr(&self) -> &RpcAddrClient<M> { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { let msg = Arc::new(msg); let mut resp_stream = to @@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc<Self>, to: &[UUID], @@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } +/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request pub struct RpcAddrClient<M: RpcMessage> { phantom: PhantomData<M>, @@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> { } impl<M: RpcMessage> RpcAddrClient<M> { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } + /// Make a RPC pub async fn call<MB>( &self, to_addr: &SocketAddr, @@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -249,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option<TlsConfig>, @@ -279,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call<M, MB>( &self, path: &str, diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 0d82d796..4419a6f0 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -1,3 +1,4 @@ +//! Contains structs related to receiving RPCs use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; @@ -22,13 +23,17 @@ use garage_util::error::Error; use crate::tls_util; +/// Trait for messages that can be sent as RPC pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; +/// Structure handling RPCs pub struct RpcServer { + /// The address the RpcServer will bind pub bind_addr: SocketAddr, + /// The tls configuration used for RPC pub tls_config: Option<TlsConfig>, handlers: HashMap<String, Handler>, @@ -87,6 +92,7 @@ where } impl RpcServer { + /// Create a new RpcServer pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { Self { bind_addr, @@ -95,6 +101,7 @@ impl RpcServer { } } + /// Add handler handling request made to `name` pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) where M: RpcMessage + 'static, @@ -156,6 +163,7 @@ impl RpcServer { } } + /// Run the RpcServer pub async fn run( self: Arc<Self>, shutdown_signal: impl Future<Output = ()>, diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs index 25ecdb07..3b1b2406 100644 --- a/src/table/crdt/lww.rs +++ b/src/table/crdt/lww.rs @@ -34,7 +34,7 @@ use crate::crdt::crdt::*; /// and may differ from what you observed with your atomic clock! /// /// This scheme is used by AWS S3 or Soundcloud and often without knowing -/// in entreprise when reconciliating databases with ad-hoc scripts. +/// in enterprise when reconciliating databases with ad-hoc scripts. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWW<T> { ts: u64, diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs index 1193e6db..c4a30a26 100644 --- a/src/table/crdt/map.rs +++ b/src/table/crdt/map.rs @@ -37,6 +37,7 @@ where Self { vals: vec![(k, v)] } } + /// Add a value to the map pub fn put(&mut self, k: K, v: V) { self.merge(&Self::put_mutator(k, v)); } diff --git a/src/table/gc.rs b/src/table/gc.rs index e52bf599..694a3789 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -74,7 +74,7 @@ where while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { - // Stuff was done, loop imediately + // Stuff was done, loop immediately continue; } Ok(false) => { diff --git a/src/table/lib.rs b/src/table/lib.rs index 3b73163b..c3e14ab8 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,10 +8,10 @@ pub mod schema; pub mod util; pub mod data; -pub mod gc; -pub mod merkle; +mod gc; +mod merkle; pub mod replication; -pub mod sync; +mod sync; pub mod table; pub use schema::*; diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index bd658f63..a6b4c98c 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -6,19 +6,19 @@ use garage_util::data::*; use crate::replication::*; +/// Full replication schema: all nodes store everything +/// Writes are disseminated in an epidemic manner in the network +/// Advantage: do all reads locally, extremely fast +/// Inconvenient: only suitable to reasonably small tables #[derive(Clone)] pub struct TableFullReplication { + /// The membership manager of this node pub system: Arc<System>, + /// Max number of faults allowed while replicating a record pub max_faults: usize, } impl TableReplication for TableFullReplication { - // Full replication schema: all nodes store everything - // Writes are disseminated in an epidemic manner in the network - - // Advantage: do all reads locally, extremely fast - // Inconvenient: only suitable to reasonably small tables - fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { vec![self.system.id] } diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs index d43d7f19..dfcb026a 100644 --- a/src/table/replication/mod.rs +++ b/src/table/replication/mod.rs @@ -1,6 +1,8 @@ mod parameters; -pub mod fullcopy; -pub mod sharded; +mod fullcopy; +mod sharded; +pub use fullcopy::TableFullReplication; pub use parameters::*; +pub use sharded::TableShardedReplication; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index e46bd172..c2c78c8b 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,20 +2,25 @@ use garage_rpc::ring::*; use garage_util::data::*; +/// Trait to describe how a table shall be replicated pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods - // Which nodes to send reads from + /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; + /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; - // Which nodes to send writes to + /// Which nodes to send writes to fn write_nodes(&self, hash: &Hash) -> Vec<UUID>; + /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync + /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; + /// List of existing partitions fn partitions(&self) -> Vec<(Partition, Hash)>; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index dce74b03..f2d89729 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -6,22 +6,25 @@ use garage_util::data::*; use crate::replication::*; +/// Sharded replication schema: +/// - based on the ring of nodes, a certain set of neighbors +/// store entries, given as a function of the position of the +/// entry's hash in the ring +/// - reads are done on all of the nodes that replicate the data +/// - writes as well #[derive(Clone)] pub struct TableShardedReplication { + /// The membership manager of this node pub system: Arc<System>, + /// How many time each data should be replicated pub replication_factor: usize, + /// How many nodes to contact for a read, should be at most `replication_factor` pub read_quorum: usize, + /// How many nodes to contact for a write, should be at most `replication_factor` pub write_quorum: usize, } impl TableReplication for TableShardedReplication { - // Sharded replication schema: - // - based on the ring of nodes, a certain set of neighbors - // store entries, given as a function of the position of the - // entry's hash in the ring - // - reads are done on all of the nodes that replicate the data - // - writes as well - fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) diff --git a/src/table/schema.rs b/src/table/schema.rs index 4d754664..13517271 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -4,7 +4,9 @@ use garage_util::data::*; use crate::crdt::CRDT; +/// Trait for field used to partition data pub trait PartitionKey { + /// Get the key used to partition fn hash(&self) -> Hash; } @@ -20,7 +22,9 @@ impl PartitionKey for Hash { } } +/// Trait for field used to sort data pub trait SortKey { + /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -36,25 +40,34 @@ impl SortKey for Hash { } } +/// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry<P: PartitionKey, S: SortKey>: CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + /// Get the key used to partition fn partition_key(&self) -> &P; + /// Get the key used to sort fn sort_key(&self) -> &S; + /// Is the entry a tombstone? Default implementation always return false fn is_tombstone(&self) -> bool { false } } +/// Trait for the schema used in a table pub trait TableSchema: Send + Sync { + /// The partition key used in that table type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + /// The sort key used int that table type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + /// They type for an entry in that table type E: Entry<Self::P, Self::S>; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; // Action to take if not able to decode current version: // try loading from an older version + /// Try migrating an entry from an older version fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { None } @@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync { // to stderr. fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - true - } + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool; } diff --git a/src/util/background.rs b/src/util/background.rs index b5eb8bc8..bfdaaf1e 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,3 +1,4 @@ +//! Job runner for futures and async functions use core::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -12,14 +13,15 @@ use crate::error::Error; type JobOutput = Result<(), Error>; type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; +/// Job runner for futures and async functions pub struct BackgroundRunner { - pub stop_signal: watch::Receiver<bool>, - + stop_signal: watch::Receiver<bool>, queue_in: mpsc::UnboundedSender<(Job, bool)>, worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, } impl BackgroundRunner { + /// Create a new BackgroundRunner pub fn new( n_runners: usize, stop_signal: watch::Receiver<bool>, @@ -103,7 +105,7 @@ impl BackgroundRunner { (bgrunner, await_all_done) } - // Spawn a task to be run in background + /// Spawn a task to be run in background pub fn spawn<T>(&self, job: T) where T: Future<Output = JobOutput> + Send + 'static, @@ -115,6 +117,8 @@ impl BackgroundRunner { .unwrap(); } + /// Spawn a task to be run in background. It may get discarded before running if spawned while + /// the runner is stopping pub fn spawn_cancellable<T>(&self, job: T) where T: Future<Output = JobOutput> + Send + 'static, diff --git a/src/util/config.rs b/src/util/config.rs index 9ff67711..bb70467b 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -1,3 +1,4 @@ +//! Contains type and functions related to Garage configuration file use std::io::Read; use std::net::SocketAddr; use std::path::PathBuf; @@ -6,57 +7,82 @@ use serde::{de, Deserialize}; use crate::error::Error; +/// Represent the whole configuration #[derive(Deserialize, Debug, Clone)] pub struct Config { + /// Path where to store metadata. Should be fast, but low volume pub metadata_dir: PathBuf, + /// Path where to store data. Can be slower, but need higher volume pub data_dir: PathBuf, + /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, + /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec<SocketAddr>, + /// Consule host to connect to to discover more peers pub consul_host: Option<String>, + /// Consul service name to use pub consul_service_name: Option<String>, + /// Max number of concurrent RPC request #[serde(default = "default_max_concurrent_rpc_requests")] pub max_concurrent_rpc_requests: usize, + /// Size of data blocks to save to disk #[serde(default = "default_block_size")] pub block_size: usize, #[serde(default = "default_control_write_max_faults")] pub control_write_max_faults: usize, + /// How many nodes should hold a copy of meta data #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, + /// How many nodes should hold a copy of data #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Configuration for RPC TLS pub rpc_tls: Option<TlsConfig>, + /// Configuration for S3 api pub s3_api: ApiConfig, + /// Configuration for serving files as normal web server pub s3_web: WebConfig, } +/// Configuration for RPC TLS #[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { + /// Path to certificate autority used for all nodes pub ca_cert: String, + /// Path to public certificate for this node pub node_cert: String, + /// Path to private key for this node pub node_key: String, } +/// Configuration for S3 api #[derive(Deserialize, Debug, Clone)] pub struct ApiConfig { + /// Address and port to bind for api serving pub api_bind_addr: SocketAddr, + /// S3 region to use pub s3_region: String, } +/// Configuration for serving files as normal web server #[derive(Deserialize, Debug, Clone)] pub struct WebConfig { + /// Address and port to bind for web serving pub bind_addr: SocketAddr, + /// Suffix to remove from domain name to find bucket pub root_domain: String, + /// Suffix to add when user-agent request path end with "/" pub index: String, } @@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize { 1 } +/// Read and parse configuration pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { let mut file = std::fs::OpenOptions::new() .read(true) diff --git a/src/util/data.rs b/src/util/data.rs index 8cd6dd96..34ee8a18 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -1,8 +1,10 @@ +//! Contains common types and functions related to serialization and integrity use rand::Rng; use serde::de::{self, Visitor}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::fmt; +/// An array of 32 bytes #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] pub struct FixedBytes32([u8; 32]); @@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 { } impl FixedBytes32 { + /// Access the content as a slice pub fn as_slice(&self) -> &[u8] { &self.0[..] } + /// Access the content as a mutable slice pub fn as_slice_mut(&mut self) -> &mut [u8] { &mut self.0[..] } + /// Copy to a slice pub fn to_vec(&self) -> Vec<u8> { self.0.to_vec() } + /// Try building a FixedBytes32 from a slice + /// Return None if the slice is not 32 bytes long pub fn try_from(by: &[u8]) -> Option<Self> { if by.len() != 32 { return None; @@ -80,9 +87,12 @@ impl FixedBytes32 { } } +/// A 32 bytes UUID pub type UUID = FixedBytes32; +/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance pub type Hash = FixedBytes32; +/// Compute the sha256 of a slice pub fn sha256sum(data: &[u8]) -> Hash { use sha2::{Digest, Sha256}; @@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash { hash.into() } +/// Compute the blake2 of a slice pub fn blake2sum(data: &[u8]) -> Hash { use blake2::{Blake2b, Digest}; @@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash { hash.into() } +/// A 64 bit non cryptographic hash pub type FastHash = u64; +/// Compute a (non cryptographic) of a slice pub fn fasthash(data: &[u8]) -> FastHash { use xxhash_rust::xxh3::Xxh3; @@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash { h.digest() } +/// Generate a random 32 bytes UUID pub fn gen_uuid() -> UUID { rand::thread_rng().gen::<[u8; 32]>().into() } // RMP serialization with names of fields and variants +/// Serialize to MessagePack pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> where T: Serialize + ?Sized, @@ -131,10 +146,13 @@ where Ok(wr) } +/// Serialize to JSON, truncating long result pub fn debug_serialize<T: Serialize>(x: T) -> String { match serde_json::to_string(&x) { Ok(ss) => { if ss.len() > 100 { + // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes + // (or more) codepoint ss[..100].to_string() } else { ss diff --git a/src/util/error.rs b/src/util/error.rs index a9bf0824..32dccbe6 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -1,9 +1,11 @@ +//! Module containing error types used in Garage use err_derive::Error; use hyper::StatusCode; use std::io; use crate::data::*; +/// RPC related errors #[derive(Debug, Error)] pub enum RPCError { #[error(display = "Node is down: {:?}.", _0)] @@ -28,6 +30,7 @@ pub enum RPCError { TooManyErrors(Vec<String>), } +/// Regroup all Garage errors #[derive(Debug, Error)] pub enum Error { #[error(display = "IO error: {}", _0)] diff --git a/src/util/lib.rs b/src/util/lib.rs index 055e9ab0..c080e3a3 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -1,3 +1,5 @@ +//! Crate containing common functions and types used in Garage + #[macro_use] extern crate log; diff --git a/src/util/time.rs b/src/util/time.rs index 148860e0..dfedcb26 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -1,6 +1,8 @@ +//! Module containing helper functions to manipulate time use chrono::{SecondsFormat, TimeZone, Utc}; use std::time::{SystemTime, UNIX_EPOCH}; +/// Returns milliseconds since UNIX Epoch pub fn now_msec() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -8,6 +10,8 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +/// Convert a timestamp represented as milliseconds since UNIX Epoch to +/// its RFC3339 representation, such as "2021-01-01T12:30:00Z" pub fn msec_to_rfc3339(msecs: u64) -> String { let secs = msecs as i64 / 1000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; diff --git a/src/web/error.rs b/src/web/error.rs index 14bc3b75..f6afbb42 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -3,30 +3,37 @@ use hyper::StatusCode; use garage_util::error::Error as GarageError; +/// Errors of this crate #[derive(Debug, Error)] pub enum Error { + /// An error received from the API crate #[error(display = "API error: {}", _0)] - ApiError(#[error(source)] garage_api::error::Error), + ApiError(#[error(source)] garage_api::Error), // Category: internal error + /// Error internal to garage #[error(display = "Internal error: {}", _0)] InternalError(#[error(source)] GarageError), + /// The file does not exist #[error(display = "Not found")] NotFound, - // Category: bad request + /// The request contained an invalid UTF-8 sequence in its path or in other parameters #[error(display = "Invalid UTF-8: {}", _0)] InvalidUTF8(#[error(source)] std::str::Utf8Error), + /// The client send a header with invalid value #[error(display = "Invalid header value: {}", _0)] InvalidHeader(#[error(source)] hyper::header::ToStrError), + /// The client sent a request without host, or with unsupported method #[error(display = "Bad request: {}", _0)] BadRequest(String), } impl Error { + /// Transform errors into http status code pub fn http_status_code(&self) -> StatusCode { match self { Error::NotFound => StatusCode::NOT_FOUND, diff --git a/src/web/lib.rs b/src/web/lib.rs index f28937b9..c06492a3 100644 --- a/src/web/lib.rs +++ b/src/web/lib.rs @@ -1,6 +1,9 @@ +//! Crate for handling web serving of s3 bucket #[macro_use] extern crate log; -pub mod error; +mod error; +pub use error::Error; -pub mod web_server; +mod web_server; +pub use web_server::run_web_server; diff --git a/src/web/web_server.rs b/src/web/web_server.rs index cfde2bcc..9635eca6 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -18,6 +18,7 @@ use garage_model::garage::Garage; use garage_table::*; use garage_util::error::Error as GarageError; +/// Run a web server pub async fn run_web_server( garage: Arc<Garage>, shutdown_signal: impl Future<Output = ()>, |