aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2021-04-08 15:01:13 +0200
committerAlex <alex@adnab.me>2021-04-08 15:01:13 +0200
commitc35c472dc9b04ed49e28a78bc9fa96baa7282502 (patch)
treecb7390fabc55b047b2d75a0af3f3db9ab9d247bc /src/model/block.rs
parentc3bd672d58d32c8fc3b3225bfc2bfb5330ec726e (diff)
parent718ae005486baeed358d56cc7cd319fedd1e76eb (diff)
downloadgarage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.tar.gz
garage-c35c472dc9b04ed49e28a78bc9fa96baa7282502.zip
Merge pull request 'add doc comments' (#53) from trinity-1686a/garage:doc-comments into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/53
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs34
1 files changed, 29 insertions, 5 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 0d9af38f..5f428fe1 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
-use garage_table::replication::{sharded::TableShardedReplication, TableReplication};
+use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*;
use crate::garage::Garage;
+/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
+ /// Message to ask for a block of data, by hash
GetBlock(Hash),
+ /// Message to send a block of data, either because requested, of for first delivery of new
+ /// block
PutBlock(PutBlockMessage),
+ /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
+ /// Response : whether the node do require that block
NeedBlockReply(bool),
}
+/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
+ /// Hash of the block
pub hash: Hash,
+ /// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
+/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
+ /// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
+ /// Directory in which block are stored
pub data_dir: PathBuf,
+ /// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
- // Launch 2 simultaneous workers for background resync loop preprocessing
+ // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
+ // launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@@ -141,7 +156,8 @@ impl BlockManager {
}
}
- pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ /// Write a block to disk
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok)
}
- pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ /// Read block from disk, verifying it's integrity
+ async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
- pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
+ /// Check if this node should have a block, but don't actually have it
+ async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
.get(hash.as_ref())?
@@ -217,6 +235,8 @@ impl BlockManager {
path
}
+ /// Increment the number of time a block is used, putting it to resynchronization if it is
+ /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -229,6 +249,7 @@ impl BlockManager {
Ok(())
}
+ /// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -388,6 +409,7 @@ impl BlockManager {
Ok(())
}
+ /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@@ -412,6 +434,7 @@ impl BlockManager {
)))
}
+ /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@@ -498,6 +521,7 @@ impl BlockManager {
.boxed()
}
+ /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}