diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/s3_list.rs | 2 | ||||
-rw-r--r-- | src/model/block.rs | 28 |
2 files changed, 18 insertions, 12 deletions
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 98d774db..16d96a49 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -42,7 +42,7 @@ pub fn parse_list_objects_query( Ok(ListObjectsQuery { is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false), bucket: bucket.to_string(), - delimiter: params.get("delimiter").cloned(), + delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(), max_keys: params .get("max-keys") .map(|x| { diff --git a/src/model/block.rs b/src/model/block.rs index eccc2cbd..9426f683 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -165,7 +165,7 @@ impl BlockManager { Ok(f) => f, Err(e) => { // Not found but maybe we should have had it ?? - self.put_to_resync(hash, 0)?; + self.put_to_resync(hash, Duration::from_millis(0))?; return Err(Into::into(e)); } }; @@ -175,9 +175,11 @@ impl BlockManager { if data::blake2sum(&data[..]) != *hash { let _lock = self.data_dir_lock.lock().await; - warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); - fs::remove_file(path).await?; - self.put_to_resync(&hash, 0)?; + warn!("Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash); + let mut path2 = path.clone(); + path2.set_extension(".corrupted"); + fs::rename(path, path2).await?; + self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } @@ -215,7 +217,7 @@ impl BlockManager { let old_rc = self.rc.get(&hash)?; self.rc.merge(&hash, vec![1])?; if old_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?; } Ok(()) } @@ -223,13 +225,13 @@ impl BlockManager { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { let new_rc = self.rc.merge(&hash, vec![0])?; if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { - self.put_to_resync(&hash, 0)?; + self.put_to_resync(&hash, Duration::from_secs(0))?; } Ok(()) } - fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { - let when = now_msec() + delay_millis; + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { + let when = now_msec() + delay.as_millis() as u64; trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -252,7 +254,7 @@ impl BlockManager { if let Err(e) = self.resync_iter(&hash).await { warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; n_failures += 1; if n_failures >= 10 { warn!("Too many resync failures, throttling."); @@ -281,6 +283,8 @@ impl BlockManager { } async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + let lock = self.data_dir_lock.lock().await; + let path = self.block_path(hash); let exists = fs::metadata(&path).await.is_ok(); @@ -360,6 +364,8 @@ impl BlockManager { } if needed && !exists { + drop(lock); + // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. @@ -420,7 +426,7 @@ impl BlockManager { } if !block_ref.deleted.get() { last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, 0)?; + self.put_to_resync(&block_ref.block, Duration::from_secs(0))?; } i += 1; if i & 0xFF == 0 && *must_exit.borrow() { @@ -464,7 +470,7 @@ impl BlockManager { }; let mut hash = [0u8; 32]; hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; + self.put_to_resync(&hash.into(),Duration::from_secs(0))?; } if *must_exit.borrow() { |