diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/manager.rs | 29 | ||||
-rw-r--r-- | src/block/rc.rs | 42 |
2 files changed, 39 insertions, 32 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index abc5063d..b7dcaf8a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -289,28 +289,41 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known - pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_incref(hash)? { + pub fn block_incref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + if self.rc.block_incref(tx, &hash)? { // When the reference counter is incremented, there is // normally a node that is responsible for sending us the // data of the block. However that operation may fail, // so in all cases we add the block here to the todo list // to check later that it arrived correctly, and if not // we will fecth it from someone. - self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } /// Decrement the number of time a block is used - pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.block_decref(hash)? { + pub fn block_decref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + if self.rc.block_decref(tx, &hash)? { // When the RC is decremented, it might drop to zero, // indicating that we don't need the block. // There is a delay before we garbage collect it; // make sure that it is handled in the resync loop // after that delay has passed. - self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?; + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(1)).await; + if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10)) + { + error!("Block {:?} could not be put in resync queue: {}.", hash, e); + } + }); } Ok(()) } @@ -510,12 +523,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), db::Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), db::Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); diff --git a/src/block/rc.rs b/src/block/rc.rs index e0b952fd..42cdf241 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -19,35 +19,29 @@ impl BlockRc { /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. - pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> { - let old_rc = self.rc.db().transaction(|mut tx| { - let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); - match old_rc.increment().serialize() { - Some(x) => { - tx.insert(&self.rc, &hash, x)?; - } - None => unreachable!(), - }; - tx.commit(old_rc) - })?; + pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + match old_rc.increment().serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => unreachable!(), + }; Ok(old_rc.is_zero()) } /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. - pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> { - let new_rc = self.rc.db().transaction(|mut tx| { - let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); - match new_rc.serialize() { - Some(x) => { - tx.insert(&self.rc, &hash, x)?; - } - None => { - tx.remove(&self.rc, &hash)?; - } - }; - tx.commit(new_rc) - })?; + pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + match new_rc.serialize() { + Some(x) => { + tx.insert(&self.rc, &hash, x)?; + } + None => { + tx.remove(&self.rc, &hash)?; + } + }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } |