aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index eccc2cbd..9426f683 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -165,7 +165,7 @@ impl BlockManager {
Ok(f) => f,
Err(e) => {
// Not found but maybe we should have had it ??
- self.put_to_resync(hash, 0)?;
+ self.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Into::into(e));
}
};
@@ -175,9 +175,11 @@ impl BlockManager {
if data::blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await;
- warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
- fs::remove_file(path).await?;
- self.put_to_resync(&hash, 0)?;
+ warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash);
+ let mut path2 = path.clone();
+ path2.set_extension(".corrupted");
+ fs::rename(path, path2).await?;
+ self.put_to_resync(&hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash));
}
@@ -215,7 +217,7 @@ impl BlockManager {
let old_rc = self.rc.get(&hash)?;
self.rc.merge(&hash, vec![1])?;
if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
Ok(())
}
@@ -223,13 +225,13 @@ impl BlockManager {
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.merge(&hash, vec![0])?;
if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
- self.put_to_resync(&hash, 0)?;
+ self.put_to_resync(&hash, Duration::from_secs(0))?;
}
Ok(())
}
- fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> {
- let when = now_msec() + delay_millis;
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
+ let when = now_msec() + delay.as_millis() as u64;
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -252,7 +254,7 @@ impl BlockManager {
if let Err(e) = self.resync_iter(&hash).await {
warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
n_failures += 1;
if n_failures >= 10 {
warn!("Too many resync failures, throttling.");
@@ -281,6 +283,8 @@ impl BlockManager {
}
async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ let lock = self.data_dir_lock.lock().await;
+
let path = self.block_path(hash);
let exists = fs::metadata(&path).await.is_ok();
@@ -360,6 +364,8 @@ impl BlockManager {
}
if needed && !exists {
+ drop(lock);
+
// TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called.
@@ -420,7 +426,7 @@ impl BlockManager {
}
if !block_ref.deleted.get() {
last_hash = Some(block_ref.block);
- self.put_to_resync(&block_ref.block, 0)?;
+ self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
}
i += 1;
if i & 0xFF == 0 && *must_exit.borrow() {
@@ -464,7 +470,7 @@ impl BlockManager {
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
- self.put_to_resync(&hash.into(), 0)?;
+ self.put_to_resync(&hash.into(),Duration::from_secs(0))?;
}
if *must_exit.borrow() {