aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 18:27:26 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 18:27:26 +0100
commit0290afe1f8eafabf71695d677807e07658d078ab (patch)
tree882c6b3601b0f9e3037e59cdae834d69a01be5f3
parent3bf2df622a070fe8f233bec6d60bd5cca995fbfc (diff)
downloadgarage-0290afe1f8eafabf71695d677807e07658d078ab.tar.gz
garage-0290afe1f8eafabf71695d677807e07658d078ab.zip
Make block rc code more understandable
-rw-r--r--src/garage/admin_rpc.rs18
-rw-r--r--src/model/block.rs54
-rw-r--r--src/table/merkle.rs2
3 files changed, 40 insertions, 34 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index a75c9013..512e00dd 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -425,6 +425,12 @@ impl AdminRpcHandler {
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
writeln!(
&mut ret,
+ " number of blocks: {}",
+ self.garage.block_manager.rc.len()
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
" resync queue length: {}",
self.garage.block_manager.resync_queue.len()
)
@@ -451,6 +457,18 @@ impl AdminRpcHandler {
t.data.merkle_updater.todo.len()
)
.unwrap();
+ writeln!(
+ to,
+ " Merkle tree size: {}",
+ t.data.merkle_updater.merkle_tree.len()
+ )
+ .unwrap();
+ writeln!(
+ to,
+ " GC todo queue length: {}",
+ t.data.gc_todo.len()
+ )
+ .unwrap();
Ok(())
}
}
diff --git a/src/model/block.rs b/src/model/block.rs
index 36ad867a..8523474a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -77,7 +77,6 @@ impl BlockManager {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
- rc.set_merge_operator(rc_merge);
let resync_queue = db
.open_tree("block_local_resync_queue")
@@ -194,7 +193,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if needed {
let path = self.block_path(hash);
@@ -218,17 +217,27 @@ impl BlockManager {
}
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- let old_rc = self.rc.get(&hash)?;
- self.rc.merge(&hash, vec![1])?;
- if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ let old_rc = self.rc.fetch_and_update(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ Some(u64::to_be_bytes(old_v + 1).to_vec())
+ })?;
+ let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
+ if old_rc == 0 {
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
Ok(())
}
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- let new_rc = self.rc.merge(&hash, vec![0])?;
- if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ let new_rc = self.rc.update_and_fetch(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ if old_v > 1 {
+ Some(u64::to_be_bytes(old_v - 1).to_vec())
+ } else {
+ None
+ }
+ })?;
+ if new_rc.is_none() {
self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
}
Ok(())
@@ -251,7 +260,7 @@ impl BlockManager {
let mut n_failures = 0usize;
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
- let time_msec = u64_from_bytes(&time_bytes[0..8]);
+ let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
@@ -295,7 +304,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if exists != needed {
@@ -487,30 +496,9 @@ impl BlockManager {
}
}
-fn u64_from_bytes(bytes: &[u8]) -> u64 {
- assert!(bytes.len() == 8);
+fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
+ assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
- x8.copy_from_slice(bytes);
+ x8.copy_from_slice(bytes.as_ref());
u64::from_be_bytes(x8)
}
-
-fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
- let old = old.map(u64_from_bytes).unwrap_or(0);
- assert!(new.len() == 1);
- let new = match new[0] {
- 0 => {
- if old > 0 {
- old - 1
- } else {
- 0
- }
- }
- 1 => old + 1,
- _ => unreachable!(),
- };
- if new == 0 {
- None
- } else {
- Some(u64::to_be_bytes(new).to_vec())
- }
-}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 7a0adba1..a917a028 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -46,7 +46,7 @@ pub struct MerkleUpdater {
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
- pub(crate) merkle_tree: sled::Tree,
+ pub merkle_tree: sled::Tree,
empty_node_hash: Hash,
}