diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 88 |
1 files changed, 81 insertions, 7 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 3dd9df33..92aa8c2a 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,5 +1,5 @@ use rand::Rng; -use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeSet, BTreeMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -10,19 +10,21 @@ use futures_util::future::*; use tokio::sync::watch; use tokio::sync::Mutex; use serde::{Serialize, Deserialize}; +use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; use crate::membership::Ring; use crate::table::*; +const MAX_DEPTH: usize = 16; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); pub struct TableSyncer<F: TableSchema> { pub table: Arc<Table<F>>, pub todo: Mutex<SyncTodo>, - pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>, + pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, } pub struct SyncTodo { @@ -43,6 +45,17 @@ pub struct SyncRange { pub level: usize, } +impl std::cmp::PartialOrd for SyncRange { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} +impl std::cmp::Ord for SyncRange { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.begin.cmp(&other.begin) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RangeChecksum { pub bounds: SyncRange, @@ -59,7 +72,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), - cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(), + cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -83,12 +96,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { + tokio::time::delay_for(Duration::from_secs(10)); + self.todo.lock().await.add_full_scan(&self.table); let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); - loop { + while !*must_exit.borrow() { let s_ring_recv = ring_recv.recv().fuse(); let s_must_exit = must_exit.recv().fuse(); pin_mut!(s_ring_recv, s_must_exit); @@ -96,21 +111,24 @@ impl<F: TableSchema + 'static> TableSyncer<F> { select! { _ = next_full_scan => { next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + eprintln!("Adding full scan to syncer todo list"); self.todo.lock().await.add_full_scan(&self.table); } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { + eprintln!("Adding ring difference to syncer todo list"); self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); prev_ring = new_ring; } } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { - return Ok(()) + break; } } } } + Ok(()) } async fn syncer_task( @@ -131,6 +149,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + eprintln!("Calculating root checksum for {:?}...", partition); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; eprintln!("Root checksum for {:?}: {:?}", partition, root_cks); @@ -152,7 +171,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { - for i in 1..32 { + for i in 1..MAX_DEPTH { let rc = self.range_checksum(&SyncRange{ begin: begin.to_vec(), end: end.to_vec(), @@ -262,13 +281,49 @@ impl<F: TableSchema + 'static> TableSyncer<F> { todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { + eprintln!("Sync with {:?}: {} remaining", who, todo.len()); + let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); - unimplemented!() + + let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?; + if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { + let mut items = vec![]; + for differing in s.drain(..) { + if differing.level == 0 { + items.push(differing.begin); + } else { + let checksum = self.range_checksum(&differing, &mut must_exit).await?; + todo.push_back(checksum); + } + } + if items.len() > 0 { + self.table.system.background.spawn(self.clone().send_items(who.clone(), items)); + } + } else { + return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp)))); + } } Ok(()) } + async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { + eprintln!("Sending {} items to {:?}", item_list.len(), who); + + let mut values = vec![]; + for item in item_list.iter() { + if let Some(v) = self.table.store.get(&item[..])? { + values.push(Arc::new(ByteBuf::from(v.as_ref()))); + } + } + let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?; + if let TableRPC::<F>::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp)))) + } + } + pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { let mut ret = vec![]; for ckr in checksums.iter() { @@ -288,6 +343,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } Ok(ret) } + + pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { + for i in 1..MAX_DEPTH { + let needle = SyncRange{ + begin: item_key.to_vec(), + end: vec![], + level: i, + }; + let mut cache = self.cache[i].lock().await; + if let Some(cache_entry) = cache.range(..=needle).rev().next() { + if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + let index = cache_entry.0.clone(); + drop(cache_entry); + cache.remove(&index); + } + } + } + Ok(()) + } } impl SyncTodo { |