diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/block.rs | 34 | ||||
-rw-r--r-- | src/model/block_ref_table.rs | 5 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 17 | ||||
-rw-r--r-- | src/model/garage.rs | 13 | ||||
-rw-r--r-- | src/model/key_table.rs | 26 | ||||
-rw-r--r-- | src/model/object_table.rs | 40 | ||||
-rw-r--r-- | src/model/version_table.rs | 15 |
7 files changed, 125 insertions, 25 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 0d9af38f..5f428fe1 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -18,12 +18,13 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; +use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec<u8>, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc<Self>) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,7 +156,8 @@ impl BlockManager { } } - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + /// Write a block to disk + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -159,7 +175,8 @@ impl BlockManager { Ok(Message::Ok) } - pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + /// Read block from disk, verifying it's integrity + async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -190,7 +207,8 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } - pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { + /// Check if this node should have a block, but don't actually have it + async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { let needed = self .rc .get(hash.as_ref())? @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,7 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +409,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +434,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +521,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index e4372717..1f0c7bb0 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -10,13 +10,14 @@ use crate::block::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { - // Primary key + /// Hash of the block, used as partition key pub block: Hash, - // Sort key + /// Id of the Version for the object containing this block, used as sorting key pub version: UUID, // Keep track of deleted status + /// Is the Version that contains this block deleted pub deleted: crdt::Bool, } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 6330dced..6a4b021d 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -12,15 +12,18 @@ use crate::key_table::PermissionSet; /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { - // Primary key + /// Name of the bucket pub name: String, - + /// State, and configuration if not deleted, of the bucket pub state: crdt::LWW<BucketState>, } +/// State of a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum BucketState { + /// The bucket is deleted Deleted, + /// The bucket exists Present(BucketParams), } @@ -37,9 +40,12 @@ impl CRDT for BucketState { } } +/// Configuration for a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give pub authorized_keys: crdt::LWWMap<String, PermissionSet>, + /// Is the bucket served as http pub website: crdt::LWW<bool>, } @@ -51,6 +57,7 @@ impl CRDT for BucketParams { } impl BucketParams { + /// Initializes a new instance of the Bucket struct pub fn new() -> Self { BucketParams { authorized_keys: crdt::LWWMap::new(), @@ -60,15 +67,21 @@ impl BucketParams { } impl Bucket { + /// Create a new bucket pub fn new(name: String) -> Self { Bucket { name, state: crdt::LWW::new(BucketState::Present(BucketParams::new())), } } + + /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { *self.state.get() == BucketState::Deleted } + + /// Return the list of authorized keys, when each was updated, and the permission associated to + /// the key pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { match self.state.get() { BucketState::Deleted => &[], diff --git a/src/model/garage.rs b/src/model/garage.rs index 5f7a67c9..797a91e5 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,8 +7,8 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; -use garage_table::replication::fullcopy::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableFullReplication; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block::*; @@ -18,15 +18,23 @@ use crate::key_table::*; use crate::object_table::*; use crate::version_table::*; +/// An entire Garage full of data pub struct Garage { + /// The parsed configuration Garage is running pub config: Config, + /// The local database pub db: sled::Db, + /// A background job runner pub background: Arc<BackgroundRunner>, + /// The membership manager pub system: Arc<System>, + /// The block manager pub block_manager: Arc<BlockManager>, + /// Table containing informations about buckets pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, + /// Table containing informations about api keys pub key_table: Arc<Table<KeyTable, TableFullReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, @@ -35,6 +43,7 @@ pub struct Garage { } impl Garage { + /// Create and run garage pub fn new( config: Config, db: sled::Db, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index fcca3835..578f8683 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; +/// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { - // Primary key + /// The id of the key (immutable), used as partition key pub key_id: String, - // Associated secret key (immutable) + /// The secret_key associated pub secret_key: String, - // Name + /// Name for the key pub name: crdt::LWW<String>, - // Deletion + /// Is the key deleted pub deleted: crdt::Bool, - // Authorized keys - pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, + /// Buckets in which the key is authorized. Empty if `Key` is deleted // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, } impl Key { + /// Create a new key pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); @@ -34,6 +36,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Import a key from it's parts pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { Self { key_id: key_id.to_string(), @@ -43,6 +47,8 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } + + /// Create a new Key which can me merged to mark an existing key deleted pub fn delete(key_id: String) -> Self { Self { key_id, @@ -52,13 +58,16 @@ impl Key { authorized_buckets: crdt::LWWMap::new(), } } - /// Add an authorized bucket, only if it wasn't there before + + /// Check if `Key` is allowed to read in bucket pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } + + /// Check if `Key` is allowed to write in bucket pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets .get(&bucket.to_string()) @@ -67,9 +76,12 @@ impl Key { } } +/// Permission given to a key in a bucket #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct PermissionSet { + /// The key can be used to read the bucket pub allow_read: bool, + /// The key can be used to write in the bucket pub allow_write: bool, } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 34ac798a..ff42d065 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::version_table::*; +/// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { - // Primary key + /// The bucket in which the object is stored, used as partition key pub bucket: String, - // Sort key + /// The key at which the object is stored in its bucket, used as sorting key pub key: String, - // Data + /// The list of currenty stored versions of the object versions: Vec<ObjectVersion>, } impl Object { + /// Create an object from parts pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { let mut ret = Self { bucket, @@ -36,6 +38,7 @@ impl Object { } ret } + /// Adds a version if it wasn't already present pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { match self @@ -49,23 +52,32 @@ impl Object { Ok(_) => Err(()), } } + + /// Get a list of currently stored versions of `Object` pub fn versions(&self) -> &[ObjectVersion] { &self.versions[..] } } +/// Informations about a version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { + /// Id of the version pub uuid: UUID, + /// Timestamp of when the object was created pub timestamp: u64, - + /// State of the version pub state: ObjectVersionState, } +/// State of an object version #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { + /// The version is being received Uploading(ObjectVersionHeaders), + /// The version is fully received Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted Aborted, } @@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState { } } +/// Data about an object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such DeleteMarker, + /// The object is short, it's stored inlined Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table FirstBlock(ObjectVersionMeta, Hash), } @@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } +/// Metadata about the object version #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionMeta { + /// Headers to send to the client pub headers: ObjectVersionHeaders, + /// Size of the object pub size: u64, + /// etag of the object pub etag: String, } +/// Additional headers for an object #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersionHeaders { + /// Content type of the object pub content_type: String, + /// Any other http headers to send pub other: BTreeMap<String, String>, } @@ -118,18 +142,24 @@ impl ObjectVersion { fn cmp_key(&self) -> (u64, UUID) { (self.timestamp, self.uuid) } + + /// Is the object version currently being uploaded pub fn is_uploading(&self) -> bool { match self.state { ObjectVersionState::Uploading(_) => true, _ => false, } } + + /// Is the object version completely received pub fn is_complete(&self) -> bool { match self.state { ObjectVersionState::Complete(_) => true, _ => false, } } + + /// Is the object version available (received and not a tombstone) pub fn is_data(&self) -> bool { match self.state { ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..bb836868 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; -use garage_table::replication::sharded::*; +use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::block_ref_table::*; +/// A version of an object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { - // Primary key + /// UUID of the version, used as partition key pub uuid: UUID, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted pub deleted: crdt::Bool, + /// list of blocks of data composing the version pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on + /// Bucket in which the related object is stored pub bucket: String, + /// Key in which the related object is stored pub key: String, } @@ -43,7 +49,9 @@ impl Version { #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { + /// Number of the part pub part_number: u64, + /// Offset of this sub-segment in its part pub offset: u64, } @@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey { } } +/// Informations about a single block #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { + /// Hash of the block pub hash: Hash, + /// Size of the block pub size: u64, } |