aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs54
1 files changed, 21 insertions, 33 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 36ad867a..8523474a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -77,7 +77,6 @@ impl BlockManager {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
- rc.set_merge_operator(rc_merge);
let resync_queue = db
.open_tree("block_local_resync_queue")
@@ -194,7 +193,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if needed {
let path = self.block_path(hash);
@@ -218,17 +217,27 @@ impl BlockManager {
}
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- let old_rc = self.rc.get(&hash)?;
- self.rc.merge(&hash, vec![1])?;
- if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ let old_rc = self.rc.fetch_and_update(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ Some(u64::to_be_bytes(old_v + 1).to_vec())
+ })?;
+ let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
+ if old_rc == 0 {
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
Ok(())
}
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- let new_rc = self.rc.merge(&hash, vec![0])?;
- if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ let new_rc = self.rc.update_and_fetch(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ if old_v > 1 {
+ Some(u64::to_be_bytes(old_v - 1).to_vec())
+ } else {
+ None
+ }
+ })?;
+ if new_rc.is_none() {
self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
}
Ok(())
@@ -251,7 +260,7 @@ impl BlockManager {
let mut n_failures = 0usize;
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
- let time_msec = u64_from_bytes(&time_bytes[0..8]);
+ let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
@@ -295,7 +304,7 @@ impl BlockManager {
let needed = self
.rc
.get(hash.as_ref())?
- .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .map(|x| u64_from_be_bytes(x) > 0)
.unwrap_or(false);
if exists != needed {
@@ -487,30 +496,9 @@ impl BlockManager {
}
}
-fn u64_from_bytes(bytes: &[u8]) -> u64 {
- assert!(bytes.len() == 8);
+fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
+ assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
- x8.copy_from_slice(bytes);
+ x8.copy_from_slice(bytes.as_ref());
u64::from_be_bytes(x8)
}
-
-fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
- let old = old.map(u64_from_bytes).unwrap_or(0);
- assert!(new.len() == 1);
- let new = match new[0] {
- 0 => {
- if old > 0 {
- old - 1
- } else {
- 0
- }
- }
- 1 => old + 1,
- _ => unreachable!(),
- };
- if new == 0 {
- None
- } else {
- Some(u64::to_be_bytes(new).to_vec())
- }
-}