diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 27 |
1 files changed, 26 insertions, 1 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 2b145615..38b2325c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -24,6 +24,7 @@ use crate::block_ref_table::*; use crate::garage::Garage; +/// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; pub const BACKGROUND_WORKERS: u64 = 1; @@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +/// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum Message { Ok, + /// Message to ask for a block of data, by hash GetBlock(Hash), + /// Message to send a block of data, either because requested, of for first delivery of new + /// block PutBlock(PutBlockMessage), + /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), + /// Response : whether the node do require that block NeedBlockReply(bool), } +/// Structure used to send a block #[derive(Debug, Serialize, Deserialize)] pub struct PutBlockMessage { + /// Hash of the block pub hash: Hash, + /// Content of the block #[serde(with = "serde_bytes")] pub data: Vec<u8>, } impl RpcMessage for Message {} +/// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { + /// Replication strategy, allowing to find on which node blocks should be located pub replication: TableShardedReplication, + /// Directory in which block are stored pub data_dir: PathBuf, + /// Lock to prevent concurrent edition of the directory pub data_dir_lock: Mutex<()>, rc: sled::Tree, @@ -128,7 +142,8 @@ impl BlockManager { } pub fn spawn_background_worker(self: Arc<Self>) { - // Launch 2 simultaneous workers for background resync loop preprocessing + // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this + // launches only one worker with current value of BACKGROUND_WORKERS for i in 0..BACKGROUND_WORKERS { let bm2 = self.clone(); let background = self.system.background.clone(); @@ -141,6 +156,7 @@ impl BlockManager { } } + /// Write a block to disk pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { let _lock = self.data_dir_lock.lock().await; @@ -159,6 +175,7 @@ impl BlockManager { Ok(Message::Ok) } + /// Read block from disk, verifying it's integrity pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { let path = self.block_path(hash); @@ -190,6 +207,7 @@ impl BlockManager { Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } + /// Check if this node should have a block, but don't actually have it pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { let needed = self .rc @@ -217,6 +235,8 @@ impl BlockManager { path } + /// Increment the number of time a block is used, putting it to resynchronization if it is + /// required, but not known pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -229,6 +249,8 @@ impl BlockManager { Ok(()) } + /// Decrement the number of time a block is used + // when counter reach 0, it seems not put to resync which I assume put it to gc? pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.update_and_fetch(&hash, |old| { let old_v = old.map(u64_from_be_bytes).unwrap_or(0); @@ -388,6 +410,7 @@ impl BlockManager { Ok(()) } + /// Ask nodes that might have a block for it pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self @@ -412,6 +435,7 @@ impl BlockManager { ))) } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); self.rpc_client @@ -498,6 +522,7 @@ impl BlockManager { .boxed() } + /// Get lenght of resync queue pub fn resync_queue_len(&self) -> usize { self.resync_queue.len() } |