aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs34
-rw-r--r--src/model/block_ref_table.rs5
-rw-r--r--src/model/bucket_table.rs17
-rw-r--r--src/model/garage.rs13
-rw-r--r--src/model/key_table.rs26
-rw-r--r--src/model/object_table.rs40
-rw-r--r--src/model/version_table.rs15
7 files changed, 125 insertions, 25 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 0d9af38f..5f428fe1 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
-use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
+use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*;
use crate::garage::Garage;
+/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
+ /// Message to ask for a block of data, by hash
GetBlock(Hash),
+ /// Message to send a block of data, either because requested, of for first delivery of new
+ /// block
PutBlock(PutBlockMessage),
+ /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
+ /// Response : whether the node do require that block
NeedBlockReply(bool),
}
+/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
+ /// Hash of the block
pub hash: Hash,
+ /// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
+/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
+ /// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
+ /// Directory in which block are stored
pub data_dir: PathBuf,
+ /// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
- // Launch 2 simultaneous workers for background resync loop preprocessing
+ // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
+ // launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@@ -141,7 +156,8 @@ impl BlockManager {
}
}
- pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ /// Write a block to disk
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok)
}
- pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ /// Read block from disk, verifying it's integrity
+ async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
- pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
+ /// Check if this node should have a block, but don't actually have it
+ async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
.get(hash.as_ref())?
@@ -217,6 +235,8 @@ impl BlockManager {
path
}
+ /// Increment the number of time a block is used, putting it to resynchronization if it is
+ /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -229,6 +249,7 @@ impl BlockManager {
Ok(())
}
+ /// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -388,6 +409,7 @@ impl BlockManager {
Ok(())
}
+ /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@@ -412,6 +434,7 @@ impl BlockManager {
)))
}
+ /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@@ -498,6 +521,7 @@ impl BlockManager {
.boxed()
}
+ /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index e4372717..1f0c7bb0 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
- // Primary key
+ /// Hash of the block, used as partition key
pub block: Hash,
- // Sort key
+ /// Id of the Version for the object containing this block, used as sorting key
pub version: UUID,
// Keep track of deleted status
+ /// Is the Version that contains this block deleted
pub deleted: crdt::Bool,
}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 6330dced..6a4b021d 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
- // Primary key
+ /// Name of the bucket
pub name: String,
-
+ /// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>,
}
+/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState {
+ /// The bucket is deleted
Deleted,
+ /// The bucket exists
Present(BucketParams),
}
@@ -37,9 +40,12 @@ impl CRDT for BucketState {
}
}
+/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
+ /// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
+ /// Is the bucket served as http
pub website: crdt::LWW<bool>,
}
@@ -51,6 +57,7 @@ impl CRDT for BucketParams {
}
impl BucketParams {
+ /// Initializes a new instance of the Bucket struct
pub fn new() -> Self {
BucketParams {
authorized_keys: crdt::LWWMap::new(),
@@ -60,15 +67,21 @@ impl BucketParams {
}
impl Bucket {
+ /// Create a new bucket
pub fn new(name: String) -> Self {
Bucket {
name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
}
}
+
+ /// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
+
+ /// Return the list of authorized keys, when each was updated, and the permission associated to
+ /// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 5f7a67c9..797a91e5 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer;
-use garage_table::replication::fullcopy::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableFullReplication;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block::*;
@@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*;
use crate::version_table::*;
+/// An entire Garage full of data
pub struct Garage {
+ /// The parsed configuration Garage is running
pub config: Config,
+ /// The local database
pub db: sled::Db,
+ /// A background job runner
pub background: Arc<BackgroundRunner>,
+ /// The membership manager
pub system: Arc<System>,
+ /// The block manager
pub block_manager: Arc<BlockManager>,
+ /// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
+ /// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@@ -35,6 +43,7 @@ pub struct Garage {
}
impl Garage {
+ /// Create and run garage
pub fn new(
config: Config,
db: sled::Db,
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index fcca3835..578f8683 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
+/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
- // Primary key
+ /// The id of the key (immutable), used as partition key
pub key_id: String,
- // Associated secret key (immutable)
+ /// The secret_key associated
pub secret_key: String,
- // Name
+ /// Name for the key
pub name: crdt::LWW<String>,
- // Deletion
+ /// Is the key deleted
pub deleted: crdt::Bool,
- // Authorized keys
- pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
}
impl Key {
+ /// Create a new key
pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
+
+ /// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self {
key_id: key_id.to_string(),
@@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
+
+ /// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self {
Self {
key_id,
@@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(),
}
}
- /// Add an authorized bucket, only if it wasn't there before
+
+ /// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
.map(|x| x.allow_read)
.unwrap_or(false)
}
+
+ /// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets
.get(&bucket.to_string())
@@ -67,9 +76,12 @@ impl Key {
}
}
+/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
+ /// The key can be used to read the bucket
pub allow_read: bool,
+ /// The key can be used to write in the bucket
pub allow_write: bool,
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 34ac798a..ff42d065 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::version_table::*;
+/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
- // Primary key
+ /// The bucket in which the object is stored, used as partition key
pub bucket: String,
- // Sort key
+ /// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
- // Data
+ /// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>,
}
impl Object {
+ /// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
bucket,
@@ -36,6 +38,7 @@ impl Object {
}
ret
}
+
/// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self
@@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()),
}
}
+
+ /// Get a list of currently stored versions of `Object`
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
+/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
+ /// Id of the version
pub uuid: UUID,
+ /// Timestamp of when the object was created
pub timestamp: u64,
-
+ /// State of the version
pub state: ObjectVersionState,
}
+/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
+ /// The version is being received
Uploading(ObjectVersionHeaders),
+ /// The version is fully received
Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
@@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
}
}
+/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
+ /// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
@@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
+/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
+ /// Headers to send to the client
pub headers: ObjectVersionHeaders,
+ /// Size of the object
pub size: u64,
+ /// etag of the object
pub etag: String,
}
+/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
+ /// Content type of the object
pub content_type: String,
+ /// Any other http headers to send
pub other: BTreeMap<String, String>,
}
@@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid)
}
+
+ /// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
match self.state {
ObjectVersionState::Uploading(_) => true,
_ => false,
}
}
+
+ /// Is the object version completely received
pub fn is_complete(&self) -> bool {
match self.state {
ObjectVersionState::Complete(_) => true,
_ => false,
}
}
+
+ /// Is the object version available (received and not a tombstone)
pub fn is_data(&self) -> bool {
match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 841fbfea..bb836868 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
-use garage_table::replication::sharded::*;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::block_ref_table::*;
+/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
- // Primary key
+ /// UUID of the version, used as partition key
pub uuid: UUID,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
+ /// Bucket in which the related object is stored
pub bucket: String,
+ /// Key in which the related object is stored
pub key: String,
}
@@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
+ /// Number of the part
pub part_number: u64,
+ /// Offset of this sub-segment in its part
pub offset: u64,
}
@@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
}
}
+/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
+ /// Hash of the block
pub hash: Hash,
+ /// Size of the block
pub size: u64,
}