aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs29
-rw-r--r--src/block/rc.rs42
2 files changed, 39 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index abc5063d..b7dcaf8a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -289,28 +289,41 @@ impl BlockManager {
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
- pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_incref(hash)? {
+ pub fn block_incref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> {
+ if self.rc.block_incref(tx, &hash)? {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
- self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
/// Decrement the number of time a block is used
- pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_decref(hash)? {
+ pub fn block_decref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> {
+ if self.rc.block_decref(tx, &hash)? {
// When the RC is decremented, it might drop to zero,
// indicating that we don't need the block.
// There is a delay before we garbage collect it;
// make sure that it is handled in the resync loop
// after that delay has passed.
- self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10))
+ {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
@@ -510,12 +523,12 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> {
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
- fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> {
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
diff --git a/src/block/rc.rs b/src/block/rc.rs
index e0b952fd..42cdf241 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -19,35 +19,29 @@ impl BlockRc {
/// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero.
- pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
- let old_rc = self.rc.db().transaction(|mut tx| {
- let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
- match old_rc.increment().serialize() {
- Some(x) => {
- tx.insert(&self.rc, &hash, x)?;
- }
- None => unreachable!(),
- };
- tx.commit(old_rc)
- })?;
+ pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> {
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ match old_rc.increment().serialize() {
+ Some(x) => {
+ tx.insert(&self.rc, &hash, x)?;
+ }
+ None => unreachable!(),
+ };
Ok(old_rc.is_zero())
}
/// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero.
- pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
- let new_rc = self.rc.db().transaction(|mut tx| {
- let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
- match new_rc.serialize() {
- Some(x) => {
- tx.insert(&self.rc, &hash, x)?;
- }
- None => {
- tx.remove(&self.rc, &hash)?;
- }
- };
- tx.commit(new_rc)
- })?;
+ pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> {
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
+ match new_rc.serialize() {
+ Some(x) => {
+ tx.insert(&self.rc, &hash, x)?;
+ }
+ None => {
+ tx.remove(&self.rc, &hash)?;
+ }
+ };
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}