aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/api_server.rs1
-rw-r--r--src/api/encoding.rs5
-rw-r--r--src/api/error.rs16
-rw-r--r--src/api/lib.rs20
-rw-r--r--src/api/s3_get.rs5
-rw-r--r--src/garage/cli.rs2
-rw-r--r--src/garage/main.rs3
-rw-r--r--src/garage/server.rs8
-rw-r--r--src/model/block.rs34
-rw-r--r--src/model/block_ref_table.rs5
-rw-r--r--src/model/bucket_table.rs17
-rw-r--r--src/model/garage.rs13
-rw-r--r--src/model/key_table.rs26
-rw-r--r--src/model/object_table.rs40
-rw-r--r--src/model/version_table.rs15
-rw-r--r--src/rpc/lib.rs4
-rw-r--r--src/rpc/membership.rs33
-rw-r--r--src/rpc/ring.rs36
-rw-r--r--src/rpc/rpc_client.rs26
-rw-r--r--src/rpc/rpc_server.rs8
-rw-r--r--src/table/crdt/lww.rs2
-rw-r--r--src/table/crdt/map.rs1
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/lib.rs6
-rw-r--r--src/table/replication/fullcopy.rs12
-rw-r--r--src/table/replication/mod.rs6
-rw-r--r--src/table/replication/parameters.rs9
-rw-r--r--src/table/replication/sharded.rs17
-rw-r--r--src/table/schema.rs17
-rw-r--r--src/util/background.rs10
-rw-r--r--src/util/config.rs27
-rw-r--r--src/util/data.rs18
-rw-r--r--src/util/error.rs3
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/time.rs4
-rw-r--r--src/web/error.rs11
-rw-r--r--src/web/lib.rs7
-rw-r--r--src/web/web_server.rs1
38 files changed, 393 insertions, 79 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 2feb0e3a..dcc9f478 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -20,6 +20,7 @@ use crate::s3_get::*;
use crate::s3_list::*;
use crate::s3_put::*;
+/// Run the S3 API server
pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
diff --git a/src/api/encoding.rs b/src/api/encoding.rs
index 25999207..63c5dee2 100644
--- a/src/api/encoding.rs
+++ b/src/api/encoding.rs
@@ -1,9 +1,13 @@
+//! Module containing various helpers for encoding
+
+/// Escape &str for xml inclusion
pub fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
}
+/// Encode &str for use in a URI
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
let mut result = String::with_capacity(string.len() * 2);
for c in string.chars() {
@@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
result
}
+/// Encode &str either as an uri, or a valid string for xml inclusion
pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
if urlencode {
uri_encode(k, true)
diff --git a/src/api/error.rs b/src/api/error.rs
index 42a7ab10..ad0174ad 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -3,44 +3,57 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
+/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
// Category: internal error
+ /// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
+ /// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error),
+ /// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error),
// Category: cannot process
+ /// No proper api key was used, or the signature was invalid
#[error(display = "Forbidden: {}", _0)]
Forbidden(String),
+ /// The object requested don't exists
#[error(display = "Not found")]
NotFound,
// Category: bad request
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
+ /// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
+ /// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError),
+ /// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
InvalidXML(String),
+ /// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
+ /// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError),
+ /// The client sent an invalid request
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
@@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
}
impl Error {
+ /// Get the HTTP status code that best represents the meaning of the error for the client
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
@@ -65,6 +79,7 @@ impl Error {
}
}
+/// Trait to map error to the Bad Request error code
pub trait OkOrBadRequest {
type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
@@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
}
}
+/// Trait to map an error to an Internal Error code
pub trait OkOrInternalError {
type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;
diff --git a/src/api/lib.rs b/src/api/lib.rs
index 9bb07925..be7e37c8 100644
--- a/src/api/lib.rs
+++ b/src/api/lib.rs
@@ -1,15 +1,19 @@
+//! Crate for serving a S3 compatible API
#[macro_use]
extern crate log;
-pub mod error;
+mod error;
+pub use error::Error;
-pub mod encoding;
+mod encoding;
-pub mod api_server;
-pub mod signature;
+mod api_server;
+pub use api_server::run_api_server;
-pub mod s3_copy;
-pub mod s3_delete;
+mod signature;
+
+mod s3_copy;
+mod s3_delete;
pub mod s3_get;
-pub mod s3_list;
-pub mod s3_put;
+mod s3_list;
+mod s3_put;
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 2590c9bd..15c0ed0a 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -1,3 +1,4 @@
+//! Function related to GET and HEAD requests
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
@@ -79,6 +80,7 @@ fn try_answer_cached(
}
}
+/// Handle HEAD request
pub async fn handle_head(
garage: Arc<Garage>,
req: &Request<Body>,
@@ -118,6 +120,7 @@ pub async fn handle_head(
Ok(response)
}
+/// Handle GET request
pub async fn handle_get(
garage: Arc<Garage>,
req: &Request<Body>,
@@ -224,7 +227,7 @@ pub async fn handle_get(
}
}
-pub async fn handle_get_range(
+async fn handle_get_range(
garage: Arc<Garage>,
version: &ObjectVersion,
version_data: &ObjectVersionData,
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index eb8275a9..55cd222b 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")]
capacity: Option<u32>,
- /// Optionnal node tag
+ /// Optional node tag
#[structopt(short = "t", long = "tag")]
tag: Option<String>,
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 6c86d0fb..a78e0f03 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -1,4 +1,5 @@
#![recursion_limit = "1024"]
+//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
#[macro_use]
extern crate log;
@@ -25,7 +26,7 @@ use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
-pub struct Opt {
+struct Opt {
/// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
pub rpc_host: SocketAddr,
diff --git a/src/garage/server.rs b/src/garage/server.rs
index feb858e4..97a9bec2 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_api::api_server;
+use garage_api::run_api_server;
use garage_model::garage::Garage;
use garage_rpc::rpc_server::RpcServer;
-use garage_web::web_server;
+use garage_web::run_web_server;
use crate::admin_rpc::*;
@@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
- let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
+ let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
+ let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
bootstrap.map(|rv| {
diff --git a/src/model/block.rs b/src/model/block.rs
index 0d9af38f..5f428fe1 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
-use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
+use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*;
use crate::garage::Garage;
+/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
+ /// Message to ask for a block of data, by hash
GetBlock(Hash),
+ /// Message to send a block of data, either because requested, of for first delivery of new
+ /// block
PutBlock(PutBlockMessage),
+ /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
+ /// Response : whether the node do require that block
NeedBlockReply(bool),
}
+/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
+ /// Hash of the block
pub hash: Hash,
+ /// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
+/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
+ /// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
+ /// Directory in which block are stored
pub data_dir: PathBuf,
+ /// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
- // Launch 2 simultaneous workers for background resync loop preprocessing
+ // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
+ // launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@@ -141,7 +156,8 @@ impl BlockManager {
}
}
- pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ /// Write a block to disk
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok)
}
- pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ /// Read block from disk, verifying it's integrity
+ async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
- pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
+ /// Check if this node should have a block, but don't actually have it
+ async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
.get(hash.as_ref())?
@@ -217,6 +235,8 @@ impl BlockManager {
path
}
+ /// Increment the number of time a block is used, putting it to resynchronization if it is
+ /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -229,6 +249,7 @@ impl BlockManager {
Ok(())
}
+ /// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -388,6 +409,7 @@ impl BlockManager {
Ok(())
}
+ /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@@ -412,6 +434,7 @@ impl BlockManager {
)))
}
+ /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@@ -498,6 +521,7 @@ impl BlockManager {
.boxed()
}
+ /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index e4372717..1f0c7bb0 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
- // Primary key
+ /// Hash of the block, used as partition key
pub block: Hash,
- // Sort key
+ /// Id of the Version for the object containing this block, used as sorting key
pub version: UUID,
// Keep track of deleted status
+ /// Is the Version that contains this block deleted
pub deleted: crdt::Bool,
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 6330dced..6a4b021d 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
- // Primary key
+ /// Name of the bucket
pub name: String,
-
+ /// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>,
}
+/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
+ /// The bucket is deleted
Deleted,
+ /// The bucket exists
Present(BucketParams),
}
@@ -37,9 +40,12 @@ impl CRDT for BucketState {
}
}
+/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
+ /// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
+ /// Is the bucket served as http
pub website: crdt::LWW<bool>,
}
@@ -51,6 +57,7 @@ impl CRDT for BucketParams {
}
impl BucketParams {
+ /// Initializes a new instance of the Bucket struct
pub fn new() -> Self {
BucketParams {
authorized_keys: crdt::LWWMap::new(),
@@ -60,15 +67,21 @@ impl BucketParams {
}
impl Bucket {
+ /// Create a new bucket
pub fn new(name: String) -> Self {
Bucket {
name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
}
}
+
+ /// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
+
+ /// Return the list of authorized keys, when each was updated, and the permission associated to
+ /// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 5f7a67c9..797a91e5 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
-use garage_table::replication::fullcopy::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableFullReplication;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block::*;
@@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*;
use crate::version_table::*;
+/// An entire Garage full of data
pub struct Garage {
+ /// The parsed configuration Garage is running
pub config: Config,
+ /// The local database
pub db: sled::Db,
+ /// A background job runner
pub background: Arc<BackgroundRunner>,
+ /// The membership manager
pub system: Arc<System>,
+ /// The block manager
pub block_manager: Arc<BlockManager>,
+ /// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
+ /// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@@ -35,6 +43,7 @@ pub struct Garage {
}
impl Garage {
+ /// Create and run garage
pub fn new(
config: Config,
db: sled::Db,
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index fcca3835..578f8683 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
+/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
- // Primary key
+ /// The id of the key (immutable), used as partition key
pub key_id: String,
- // Associated secret key (immutable)
+ /// The secret_key associated
pub secret_key: String,
- // Name
+ /// Name for the key
pub name: crdt::LWW<String>,
- // Deletion
+ /// Is the key deleted
pub deleted: crdt::Bool,
- // Authorized keys
- pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
}
impl Key {
+ /// Create a new key
pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
+
+ /// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self {
key_id: key_id.to_string(),
@@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
+
+ /// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
- /// Add an authorized bucket, only if it wasn't there before
+
+ /// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
+
+ /// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
@@ -67,9 +76,12 @@ impl Key {
}
}
+/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
+ /// The key can be used to read the bucket
pub allow_read: bool,
+ /// The key can be used to write in the bucket
pub allow_write: bool,
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 34ac798a..ff42d065 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::version_table::*;
+/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
- // Primary key
+ /// The bucket in which the object is stored, used as partition key
pub bucket: String,
- // Sort key
+ /// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
- // Data
+ /// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>,
}
impl Object {
+ /// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
bucket,
@@ -36,6 +38,7 @@ impl Object {
}
ret
}
+
/// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self
@@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()),
}
}
+
+ /// Get a list of currently stored versions of `Object`
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
+/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
+ /// Id of the version
pub uuid: UUID,
+ /// Timestamp of when the object was created
pub timestamp: u64,
-
+ /// State of the version
pub state: ObjectVersionState,
}
+/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
+ /// The version is being received
Uploading(ObjectVersionHeaders),
+ /// The version is fully received
Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
@@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
}
}
+/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
+ /// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
@@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
+/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
+ /// Headers to send to the client
pub headers: ObjectVersionHeaders,
+ /// Size of the object
pub size: u64,
+ /// etag of the object
pub etag: String,
}
+/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
+ /// Content type of the object
pub content_type: String,
+ /// Any other http headers to send
pub other: BTreeMap<String, String>,
}
@@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
}
+
+ /// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
match self.state {
ObjectVersionState::Uploading(_) => true,
_ => false,
}
}
+
+ /// Is the object version completely received
pub fn is_complete(&self) -> bool {
match self.state {
ObjectVersionState::Complete(_) => true,
_ => false,
}
}
+
+ /// Is the object version available (received and not a tombstone)
pub fn is_data(&self) -> bool {
match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 841fbfea..bb836868 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block_ref_table::*;
+/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
- // Primary key
+ /// UUID of the version, used as partition key
pub uuid: UUID,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
+ /// Bucket in which the related object is stored
pub bucket: String,
+ /// Key in which the related object is stored
pub key: String,
}
@@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
+ /// Number of the part
pub part_number: u64,
+ /// Offset of this sub-segment in its part
pub offset: u64,
}
@@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
}
}
+/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
+ /// Hash of the block
pub hash: Hash,
+ /// Size of the block
pub size: u64,
}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 00e31f57..96561d0e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,7 +1,9 @@
+//! Crate containing rpc related functions and types used in Garage
+
#[macro_use]
extern crate log;
-pub mod consul;
+mod consul;
pub(crate) mod tls_util;
pub mod membership;
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 9fb24ad4..5f7bbc96 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,3 +1,4 @@
+//! Module containing structs related to membership management
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
+/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
+/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
+ /// Response to successfull advertisements
Ok,
+ /// Message sent to detect other nodes status
Ping(PingMessage),
+ /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
+ /// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
+ /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
+ /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
+/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo,
}
+/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
+ /// Id of the node this advertisement relates to
pub id: UUID,
+ /// IP and port of the node
pub addr: SocketAddr,
+ /// Is the node considered up
pub is_up: bool,
+ /// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
+/// This node's membership manager
pub struct System {
+ /// The id of this node
pub id: UUID,
persist_config: Persister<NetworkConfig>,
@@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
+ /// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>,
+ /// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
@@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>,
}
+/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)]
pub struct Status {
+ /// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ /// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
+/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
+ /// The IP and port used to connect to this node
pub addr: SocketAddr,
+ /// Last time this node was seen
pub last_seen: u64,
+ /// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
+ /// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
impl System {
+ /// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@@ -279,6 +307,7 @@ impl System {
});
}
+ /// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@@ -287,6 +316,7 @@ impl System {
)
}
+ /// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone();
self.persist_config
@@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
+ /// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: Vec<SocketAddr>,
@@ -386,6 +417,8 @@ impl System {
}
} else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) {
+ // we need to increment failure counter as call was done using by_addr so the
+ // counter was not auto-incremented
st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() {
warn!("Node {:?} seems to be down.", id);
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 2e997523..bffd7f1f 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -1,3 +1,5 @@
+//! Module containing types related to computing nodes which should receive a copy of data blocks
+//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
@@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions)
+/// A partition id, stored on 16 bits
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
+/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
+/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...)
+/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3;
+/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
+ /// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>,
+ /// Version of this config
pub version: u64,
}
@@ -37,26 +46,40 @@ impl NetworkConfig {
}
}
+/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
+ /// Datacenter at which this entry belong. This infromation might be used to perform a better
+ /// geodistribution
pub datacenter: String,
+ /// The (relative) capacity of the node
pub capacity: u32,
+ /// A tag to recognize the entry, not used for other things than display
pub tag: String,
}
+/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
+ /// The network configuration used to generate this ring
pub config: NetworkConfig,
+ /// The list of entries in the ring
pub ring: Vec<RingEntry>,
}
+/// An entry in the ring
#[derive(Clone, Debug)]
pub struct RingEntry {
+ /// The prefix of the Hash of object which should use this entry
pub location: Hash,
+ /// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION],
}
impl Ring {
+ // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
+ // levels of imbrication. It is basically impossible to test, maintain, or understand for an
+ // outsider.
pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@@ -166,20 +189,16 @@ impl Ring {
})
.collect::<Vec<_>>();
- // eprintln!("RING: --");
- // for e in ring.iter() {
- // eprintln!("{:?}", e);
- // }
- // eprintln!("END --");
-
Self { config, ring }
}
+ /// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS)
}
+ /// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![];
@@ -193,6 +212,8 @@ impl Ring {
ret
}
+ // TODO rename this function as it no longer walk the ring
+ /// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
@@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
+ // TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
+ // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
+ // probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 5e76e215..8a6cc721 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -1,3 +1,4 @@
+//! Contain structs related to making RPCs
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
@@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
+ /// Max time to wait for reponse
pub rs_timeout: Duration,
+ /// Min number of response to consider the request successful
pub rs_quorum: usize,
+ /// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
}
impl RequestStrategy {
+ /// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
@@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
}
}
+ /// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
+ /// Set if requests can be dropped after quorum has been reached
+ /// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
}
+/// Shortcut for a boxed async function taking a message, and resolving to another message or an
+/// error
pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
+/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
}
impl<M: RpcMessage + 'static> RpcClient<M> {
+ /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new(
rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>,
@@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
})
}
+ /// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler))));
}
+ /// Get a RPC client to make calls using node's SocketAddr instead of its ID
pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client
}
+ /// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
+ /// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
@@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+ /// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
@@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results
}
+ /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
+ /// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
to: &[UUID],
@@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>,
@@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
}
impl<M: RpcMessage> RpcAddrClient<M> {
+ /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
@@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+ /// Make a RPC
pub async fn call<MB>(
&self,
to_addr: &SocketAddr,
@@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+/// HTTP client used to make RPCs
pub struct RpcHttpClient {
request_limiter: Semaphore,
method: ClientMethod,
@@ -249,6 +273,7 @@ enum ClientMethod {
}
impl RpcHttpClient {
+ /// Create a new RpcHttpClient
pub fn new(
max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>,
@@ -279,6 +304,7 @@ impl RpcHttpClient {
})
}
+ /// Make a RPC
async fn call<M, MB>(
&self,
path: &str,
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 0d82d796..4419a6f0 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -1,3 +1,4 @@
+//! Contains structs related to receiving RPCs
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
@@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util;
+/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
+/// Structure handling RPCs
pub struct RpcServer {
+ /// The address the RpcServer will bind
pub bind_addr: SocketAddr,
+ /// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>,
@@ -87,6 +92,7 @@ where
}
impl RpcServer {
+ /// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self {
bind_addr,
@@ -95,6 +101,7 @@ impl RpcServer {
}
}
+ /// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where
M: RpcMessage + 'static,
@@ -156,6 +163,7 @@ impl RpcServer {
}
}
+ /// Run the RpcServer
pub async fn run(
self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>,
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
index 25ecdb07..3b1b2406 100644
--- a/src/table/crdt/lww.rs
+++ b/src/table/crdt/lww.rs
@@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
-/// in entreprise when reconciliating databases with ad-hoc scripts.
+/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
ts: u64,
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
index 1193e6db..c4a30a26 100644
--- a/src/table/crdt/map.rs
+++ b/src/table/crdt/map.rs
@@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] }
}
+ /// Add a value to the map
pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v));
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e52bf599..694a3789 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -74,7 +74,7 @@ where
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
- // Stuff was done, loop imediately
+ // Stuff was done, loop immediately
continue;
}
Ok(false) => {
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 3b73163b..c3e14ab8 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -8,10 +8,10 @@ pub mod schema;
pub mod util;
pub mod data;
-pub mod gc;
-pub mod merkle;
+mod gc;
+mod merkle;
pub mod replication;
-pub mod sync;
+mod sync;
pub mod table;
pub use schema::*;
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index bd658f63..a6b4c98c 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*;
+/// Full replication schema: all nodes store everything
+/// Writes are disseminated in an epidemic manner in the network
+/// Advantage: do all reads locally, extremely fast
+/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)]
pub struct TableFullReplication {
+ /// The membership manager of this node
pub system: Arc<System>,
+ /// Max number of faults allowed while replicating a record
pub max_faults: usize,
}
impl TableReplication for TableFullReplication {
- // Full replication schema: all nodes store everything
- // Writes are disseminated in an epidemic manner in the network
-
- // Advantage: do all reads locally, extremely fast
- // Inconvenient: only suitable to reasonably small tables
-
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id]
}
diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs
index d43d7f19..dfcb026a 100644
--- a/src/table/replication/mod.rs
+++ b/src/table/replication/mod.rs
@@ -1,6 +1,8 @@
mod parameters;
-pub mod fullcopy;
-pub mod sharded;
+mod fullcopy;
+mod sharded;
+pub use fullcopy::TableFullReplication;
pub use parameters::*;
+pub use sharded::TableShardedReplication;
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index e46bd172..c2c78c8b 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -2,20 +2,25 @@ use garage_rpc::ring::*;
use garage_util::data::*;
+/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
- // Which nodes to send reads from
+ /// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ /// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
- // Which nodes to send writes to
+ /// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ /// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
+ /// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
+ /// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>;
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index dce74b03..f2d89729 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*;
+/// Sharded replication schema:
+/// - based on the ring of nodes, a certain set of neighbors
+/// store entries, given as a function of the position of the
+/// entry's hash in the ring
+/// - reads are done on all of the nodes that replicate the data
+/// - writes as well
#[derive(Clone)]
pub struct TableShardedReplication {
+ /// The membership manager of this node
pub system: Arc<System>,
+ /// How many time each data should be replicated
pub replication_factor: usize,
+ /// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
+ /// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
}
impl TableReplication for TableShardedReplication {
- // Sharded replication schema:
- // - based on the ring of nodes, a certain set of neighbors
- // store entries, given as a function of the position of the
- // entry's hash in the ring
- // - reads are done on all of the nodes that replicate the data
- // - writes as well
-
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 4d754664..13517271 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT;
+/// Trait for field used to partition data
pub trait PartitionKey {
+ /// Get the key used to partition
fn hash(&self) -> Hash;
}
@@ -20,7 +22,9 @@ impl PartitionKey for Hash {
}
}
+/// Trait for field used to sort data
pub trait SortKey {
+ /// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@@ -36,25 +40,34 @@ impl SortKey for Hash {
}
}
+/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
+ /// Get the key used to partition
fn partition_key(&self) -> &P;
+ /// Get the key used to sort
fn sort_key(&self) -> &S;
+ /// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool {
false
}
}
+/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync {
+ /// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ /// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ /// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version:
// try loading from an older version
+ /// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None
}
@@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
// to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
- fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
- true
- }
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
}
diff --git a/src/util/background.rs b/src/util/background.rs
index b5eb8bc8..bfdaaf1e 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,3 +1,4 @@
+//! Job runner for futures and async functions
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
@@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+/// Job runner for futures and async functions
pub struct BackgroundRunner {
- pub stop_signal: watch::Receiver<bool>,
-
+ stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
+ /// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
@@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done)
}
- // Spawn a task to be run in background
+ /// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
@@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap();
}
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
diff --git a/src/util/config.rs b/src/util/config.rs
index 9ff67711..bb70467b 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -1,3 +1,4 @@
+//! Contains type and functions related to Garage configuration file
use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -6,57 +7,82 @@ use serde::{de, Deserialize};
use crate::error::Error;
+/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)]
pub struct Config {
+ /// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf,
+ /// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf,
+ /// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
+ /// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>,
+ /// Consule host to connect to to discover more peers
pub consul_host: Option<String>,
+ /// Consul service name to use
pub consul_service_name: Option<String>,
+ /// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize,
+ /// Size of data blocks to save to disk
#[serde(default = "default_block_size")]
pub block_size: usize,
#[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize,
+ /// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
+ /// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
+ /// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>,
+ /// Configuration for S3 api
pub s3_api: ApiConfig,
+ /// Configuration for serving files as normal web server
pub s3_web: WebConfig,
}
+/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
+ /// Path to certificate autority used for all nodes
pub ca_cert: String,
+ /// Path to public certificate for this node
pub node_cert: String,
+ /// Path to private key for this node
pub node_key: String,
}
+/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig {
+ /// Address and port to bind for api serving
pub api_bind_addr: SocketAddr,
+ /// S3 region to use
pub s3_region: String,
}
+/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)]
pub struct WebConfig {
+ /// Address and port to bind for web serving
pub bind_addr: SocketAddr,
+ /// Suffix to remove from domain name to find bucket
pub root_domain: String,
+ /// Suffix to add when user-agent request path end with "/"
pub index: String,
}
@@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
1
}
+/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)
diff --git a/src/util/data.rs b/src/util/data.rs
index 8cd6dd96..34ee8a18 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -1,8 +1,10 @@
+//! Contains common types and functions related to serialization and integrity
use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
+/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
}
impl FixedBytes32 {
+ /// Access the content as a slice
pub fn as_slice(&self) -> &[u8] {
&self.0[..]
}
+ /// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..]
}
+ /// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec()
}
+ /// Try building a FixedBytes32 from a slice
+ /// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 {
return None;
@@ -80,9 +87,12 @@ impl FixedBytes32 {
}
}
+/// A 32 bytes UUID
pub type UUID = FixedBytes32;
+/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32;
+/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256};
@@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into()
}
+/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest};
@@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into()
}
+/// A 64 bit non cryptographic hash
pub type FastHash = u64;
+/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3;
@@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest()
}
+/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
// RMP serialization with names of fields and variants
+/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where
T: Serialize + ?Sized,
@@ -131,10 +146,13 @@ where
Ok(wr)
}
+/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) {
Ok(ss) => {
if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
ss[..100].to_string()
} else {
ss
diff --git a/src/util/error.rs b/src/util/error.rs
index a9bf0824..32dccbe6 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -1,9 +1,11 @@
+//! Module containing error types used in Garage
use err_derive::Error;
use hyper::StatusCode;
use std::io;
use crate::data::*;
+/// RPC related errors
#[derive(Debug, Error)]
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
@@ -28,6 +30,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>),
}
+/// Regroup all Garage errors
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "IO error: {}", _0)]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 055e9ab0..c080e3a3 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -1,3 +1,5 @@
+//! Crate containing common functions and types used in Garage
+
#[macro_use]
extern crate log;
diff --git a/src/util/time.rs b/src/util/time.rs
index 148860e0..dfedcb26 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -1,6 +1,8 @@
+//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
+/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
+/// Convert a timestamp represented as milliseconds since UNIX Epoch to
+/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
diff --git a/src/web/error.rs b/src/web/error.rs
index 14bc3b75..f6afbb42 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -3,30 +3,37 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
+/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
+ /// An error received from the API crate
#[error(display = "API error: {}", _0)]
- ApiError(#[error(source)] garage_api::error::Error),
+ ApiError(#[error(source)] garage_api::Error),
// Category: internal error
+ /// Error internal to garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
+ /// The file does not exist
#[error(display = "Not found")]
NotFound,
- // Category: bad request
+ /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error),
+ /// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
+ /// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
impl Error {
+ /// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
diff --git a/src/web/lib.rs b/src/web/lib.rs
index f28937b9..c06492a3 100644
--- a/src/web/lib.rs
+++ b/src/web/lib.rs
@@ -1,6 +1,9 @@
+//! Crate for handling web serving of s3 bucket
#[macro_use]
extern crate log;
-pub mod error;
+mod error;
+pub use error::Error;
-pub mod web_server;
+mod web_server;
+pub use web_server::run_web_server;
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index cfde2bcc..9635eca6 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
+/// Run a web server
pub async fn run_web_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,