diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-15 18:27:26 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-15 18:27:26 +0100 |
commit | 0290afe1f8eafabf71695d677807e07658d078ab (patch) | |
tree | 882c6b3601b0f9e3037e59cdae834d69a01be5f3 | |
parent | 3bf2df622a070fe8f233bec6d60bd5cca995fbfc (diff) | |
download | garage-0290afe1f8eafabf71695d677807e07658d078ab.tar.gz garage-0290afe1f8eafabf71695d677807e07658d078ab.zip |
Make block rc code more understandable
-rw-r--r-- | src/garage/admin_rpc.rs | 18 | ||||
-rw-r--r-- | src/model/block.rs | 54 | ||||
-rw-r--r-- | src/table/merkle.rs | 2 |
3 files changed, 40 insertions, 34 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index a75c9013..512e00dd 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -425,6 +425,12 @@ impl AdminRpcHandler { writeln!(&mut ret, "\nBlock manager stats:").unwrap(); writeln!( &mut ret, + " number of blocks: {}", + self.garage.block_manager.rc.len() + ) + .unwrap(); + writeln!( + &mut ret, " resync queue length: {}", self.garage.block_manager.resync_queue.len() ) @@ -451,6 +457,18 @@ impl AdminRpcHandler { t.data.merkle_updater.todo.len() ) .unwrap(); + writeln!( + to, + " Merkle tree size: {}", + t.data.merkle_updater.merkle_tree.len() + ) + .unwrap(); + writeln!( + to, + " GC todo queue length: {}", + t.data.gc_todo.len() + ) + .unwrap(); Ok(()) } } diff --git a/src/model/block.rs b/src/model/block.rs index 36ad867a..8523474a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -77,7 +77,6 @@ impl BlockManager { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); - rc.set_merge_operator(rc_merge); let resync_queue = db .open_tree("block_local_resync_queue") @@ -194,7 +193,7 @@ impl BlockManager { let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { let path = self.block_path(hash); @@ -218,17 +217,27 @@ impl BlockManager { } pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - let old_rc = self.rc.get(&hash)?; - self.rc.merge(&hash, vec![1])?; - if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + let old_rc = self.rc.fetch_and_update(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + Some(u64::to_be_bytes(old_v + 1).to_vec()) + })?; + let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0); + if old_rc == 0 { self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?; } Ok(()) } pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - let new_rc = self.rc.merge(&hash, vec![0])?; - if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + let new_rc = self.rc.update_and_fetch(&hash, |old| { + let old_v = old.map(u64_from_be_bytes).unwrap_or(0); + if old_v > 1 { + Some(u64::to_be_bytes(old_v - 1).to_vec()) + } else { + None + } + })?; + if new_rc.is_none() { self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?; } Ok(()) @@ -251,7 +260,7 @@ impl BlockManager { let mut n_failures = 0usize; while !*must_exit.borrow() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_bytes(&time_bytes[0..8]); + let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let now = now_msec(); if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -295,7 +304,7 @@ impl BlockManager { let needed = self .rc .get(hash.as_ref())? - .map(|x| u64_from_bytes(x.as_ref()) > 0) + .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if exists != needed { @@ -487,30 +496,9 @@ impl BlockManager { } } -fn u64_from_bytes(bytes: &[u8]) -> u64 { - assert!(bytes.len() == 8); +fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { + assert!(bytes.as_ref().len() == 8); let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes); + x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } - -fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { - let old = old.map(u64_from_bytes).unwrap_or(0); - assert!(new.len() == 1); - let new = match new[0] { - 0 => { - if old > 0 { - old - 1 - } else { - 0 - } - } - 1 => old + 1, - _ => unreachable!(), - }; - if new == 0 { - None - } else { - Some(u64::to_be_bytes(new).to_vec()) - } -} diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7a0adba1..a917a028 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -46,7 +46,7 @@ pub struct MerkleUpdater { // Content of the merkle tree: items where // - key = .bytes() for MerkleNodeKey // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found - pub(crate) merkle_tree: sled::Tree, + pub merkle_tree: sled::Tree, empty_node_hash: Hash, } |