diff options
Diffstat (limited to 'src/block.rs')
-rw-r--r-- | src/block.rs | 100 |
1 files changed, 63 insertions, 37 deletions
diff --git a/src/block.rs b/src/block.rs index e898ad19..25a10910 100644 --- a/src/block.rs +++ b/src/block.rs @@ -6,6 +6,7 @@ use futures::stream::*; use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex}; +use arc_swap::ArcSwapOption; use crate::data; use crate::data::*; @@ -13,6 +14,7 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; +use crate::server::Garage; pub struct BlockManager { pub data_dir: PathBuf, @@ -20,10 +22,11 @@ pub struct BlockManager { pub resync_queue: sled::Tree, pub lock: Mutex<()>, pub system: Arc<System>, + pub garage: ArcSwapOption<Garage>, } impl BlockManager { - pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> { + pub fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -33,20 +36,23 @@ impl BlockManager { .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let block_manager = Arc::new(Self { + Arc::new(Self { rc, resync_queue, data_dir, lock: Mutex::new(()), system, - }); - let bm2 = block_manager.clone(); - block_manager + garage: ArcSwapOption::from(None), + }) + } + + pub async fn spawn_background_worker(self: Arc<Self>) { + let bm2 = self.clone(); + self .system .background .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) .await; - block_manager } pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { @@ -80,7 +86,7 @@ impl BlockManager { let _lock = self.lock.lock().await; eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; - self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + self.put_to_resync(&hash, 0)?; return Err(Error::CorruptData(hash.clone())); } @@ -98,38 +104,55 @@ impl BlockManager { } pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { - self.rc.merge(&hash, vec![1])?; + let new_rc = self.rc.merge(&hash, vec![1])?; + if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) { + self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?; + } Ok(()) } pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { - if self.rc.merge(&hash, vec![0])?.is_none() { - self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + let new_rc = self.rc.merge(&hash, vec![0])?; + if new_rc.is_none() { + self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?; } Ok(()) } + fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> { + let when = now_msec() + delay_millis; + eprintln!("Put resync_queue: {} {:?}", when, hash); + let mut key = u64::to_be_bytes(when).to_vec(); + key.extend(hash.as_ref()); + self.resync_queue.insert(key, hash.as_ref())?; + Ok(()) + } + async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { while !*must_exit.borrow() { - if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? { - let mut hash = [0u8; 32]; - hash.copy_from_slice(hash_bytes.as_ref()); - let hash = Hash::from(hash); - - match self.resync_iter(&hash).await { - Ok(_) => { - self.resync_queue.remove(&hash_bytes)?; - } - Err(e) => { - eprintln!( - "Failed to resync hash {:?}, leaving it in queue: {}", - hash, e - ); + if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? { + let time_msec = u64_from_bytes(&time_bytes[0..8]); + eprintln!("First in resync queue: {} (now = {})", time_msec, now_msec()); + if now_msec() >= time_msec { + let mut hash = [0u8; 32]; + hash.copy_from_slice(hash_bytes.as_ref()); + let hash = Hash::from(hash); + + match self.resync_iter(&hash).await { + Ok(_) => { + self.resync_queue.remove(&hash_bytes)?; + } + Err(e) => { + eprintln!( + "Failed to resync hash {:?}, leaving it in queue: {}", + hash, e + ); + } } + continue; } - } else { - tokio::time::delay_for(Duration::from_secs(1)).await; } + tokio::time::delay_for(Duration::from_secs(1)).await; } Ok(()) } @@ -145,14 +168,23 @@ impl BlockManager { .map(|x| u64_from_bytes(x.as_ref()) > 0) .unwrap_or(false); + eprintln!("Resync block {:?}: exists {}, needed {}", hash, exists, needed); + if exists && !needed { - // TODO: verify that other nodes that might need it have it ? + let garage = self.garage.load_full().unwrap(); + let active_refs = garage.block_ref_table.get_range(&hash, &[0u8; 32].into(), Some(()), 1).await?; + let needed_by_others = !active_refs.is_empty(); + if needed_by_others { + // TODO check they have it and send it if not + } fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; } if needed && !exists { // TODO find a way to not do this if they are sending it to us + // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay + // between the RC being incremented and this part being called. let block_data = rpc_get_block(&self.system, &hash).await?; self.write_block(hash, &block_data[..]).await?; } @@ -190,11 +222,8 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { } pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> { - let who = system - .ring - .borrow() - .clone() - .walk_ring(&hash, system.config.data_replication_factor); + let ring = system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, system.config.data_replication_factor); let msg = Message::GetBlock(hash.clone()); let mut resp_stream = who .iter() @@ -215,11 +244,8 @@ pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, } pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let who = system - .ring - .borrow() - .clone() - .walk_ring(&hash, system.config.data_replication_factor); + let ring = system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, system.config.data_replication_factor); rpc_try_call_many( system.clone(), &who[..], |