aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorTrinity Pointard <trinity.pointard@gmail.com>2021-03-26 21:53:28 +0100
committerAlex Auvolat <alex@adnab.me>2021-04-27 16:37:10 +0200
commit67585a4ffab14ba7c4b7c6dca530c177059a91d9 (patch)
tree6fd0339f8d7f651553145a3c29decc56e29c23bd /src/model/block.rs
parentb4376108122bb09bd8cff562b718967d4332ffbe (diff)
downloadgarage-67585a4ffab14ba7c4b7c6dca530c177059a91d9.tar.gz
garage-67585a4ffab14ba7c4b7c6dca530c177059a91d9.zip
attempt at documenting model crate
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs27
1 files changed, 26 insertions, 1 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 2b145615..38b2325c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -24,6 +24,7 @@ use crate::block_ref_table::*;
use crate::garage::Garage;
+/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
@@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
+ /// Message to ask for a block of data, by hash
GetBlock(Hash),
+ /// Message to send a block of data, either because requested, of for first delivery of new
+ /// block
PutBlock(PutBlockMessage),
+ /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
+ /// Response : whether the node do require that block
NeedBlockReply(bool),
}
+/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
+ /// Hash of the block
pub hash: Hash,
+ /// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
}
impl RpcMessage for Message {}
+/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
+ /// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication,
+ /// Directory in which block are stored
pub data_dir: PathBuf,
+ /// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>,
rc: sled::Tree,
@@ -128,7 +142,8 @@ impl BlockManager {
}
pub fn spawn_background_worker(self: Arc<Self>) {
- // Launch 2 simultaneous workers for background resync loop preprocessing
+ // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
+ // launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
@@ -141,6 +156,7 @@ impl BlockManager {
}
}
+ /// Write a block to disk
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await;
@@ -159,6 +175,7 @@ impl BlockManager {
Ok(Message::Ok)
}
+ /// Read block from disk, verifying it's integrity
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash);
@@ -190,6 +207,7 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
}
+ /// Check if this node should have a block, but don't actually have it
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self
.rc
@@ -217,6 +235,8 @@ impl BlockManager {
path
}
+ /// Increment the number of time a block is used, putting it to resynchronization if it is
+ /// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -229,6 +249,8 @@ impl BlockManager {
Ok(())
}
+ /// Decrement the number of time a block is used
+ // when counter reach 0, it seems not put to resync which I assume put it to gc?
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@@ -388,6 +410,7 @@ impl BlockManager {
Ok(())
}
+ /// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
@@ -412,6 +435,7 @@ impl BlockManager {
)))
}
+ /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
self.rpc_client
@@ -498,6 +522,7 @@ impl BlockManager {
.boxed()
}
+ /// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
}