diff options
author | Alex <alex@adnab.me> | 2021-04-08 15:01:13 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2021-04-08 15:01:13 +0200 |
commit | c35c472dc9b04ed49e28a78bc9fa96baa7282502 (patch) | |
tree | cb7390fabc55b047b2d75a0af3f3db9ab9d247bc /src/model/block.rs | |
parent | c3bd672d58d32c8fc3b3225bfc2bfb5330ec726e (diff) | |
parent | 718ae005486baeed358d56cc7cd319fedd1e76eb (diff) | |
download | garage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.tar.gz garage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.zip |
Merge pull request 'add doc comments' (#53) from trinity-1686a/garage:doc-comments into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/53
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 34 |
1 files changed, 29 insertions, 5 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 0d9af38f..5f428fe1 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -18,12 +18,13 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; -use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; +use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec<u8>, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc<Self>) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,7 +156,8 @@ impl BlockManager { } } - pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + /// Write a block to disk + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -159,7 +175,8 @@ impl BlockManager { Ok(Message::Ok) } - pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + /// Read block from disk, verifying it's integrity + async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -190,7 +207,8 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } - pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { + /// Check if this node should have a block, but don't actually have it + async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { let needed = self .rc .get(hash.as_ref())? @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,7 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +409,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +434,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +521,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } |