diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 210 |
1 files changed, 185 insertions, 25 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 039dab6d..3dd9df33 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,12 +1,15 @@ use rand::Rng; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures::{pin_mut, select}; +use futures::future::BoxFuture; +use futures_util::stream::*; use futures_util::future::*; use tokio::sync::watch; use tokio::sync::Mutex; +use serde::{Serialize, Deserialize}; use crate::data::*; use crate::error::Error; @@ -14,11 +17,12 @@ use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); pub struct TableSyncer<F: TableSchema> { pub table: Arc<Table<F>>, - pub todo: Mutex<SyncTodo>, + pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>, } pub struct SyncTodo { @@ -32,12 +36,30 @@ pub struct Partition { pub retain: bool, } +#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct SyncRange { + pub begin: Vec<u8>, + pub end: Vec<u8>, + pub level: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RangeChecksum { + pub bounds: SyncRange, + pub children: Vec<(SyncRange, Hash)>, + pub found_limit: Option<Vec<u8>>, + + #[serde(skip, default="std::time::Instant::now")] + pub time: Instant, +} + impl<F: TableSchema + 'static> TableSyncer<F> { pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), + cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -95,39 +117,177 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { - loop { - let s_pop_task = self.pop_task().fuse(); - let s_must_exit = must_exit.recv().fuse(); - pin_mut!(s_must_exit, s_pop_task); + while !*must_exit.borrow() { + if let Some(partition) = self.todo.lock().await.pop_task() { + let res = self.clone().sync_partition(&partition, &mut must_exit).await; + if let Err(e) = res { + eprintln!("Error while syncing {:?}: {}", partition, e); + } + } else { + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + Ok(()) + } - select! { - task = s_pop_task => { - if let Some(partition) = task { - let res = self.sync_partition(&partition).await; - if let Err(e) = res { - eprintln!("Error while syncing {:?}: {}", partition, e); - } - } else { - tokio::time::delay_for(Duration::from_secs(1)).await; - } + async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; + eprintln!("Root checksum for {:?}: {:?}", partition, root_cks); + + let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); + let mut sync_futures = nodes.iter() + .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())) + .collect::<FuturesUnordered<_>>(); + + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + eprintln!("Sync error: {}", e); + } + } + if !partition.retain { + self.table.delete_range(&partition.begin, &partition.end).await?; + } + + Ok(()) + } + + async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + for i in 1..32 { + let rc = self.range_checksum(&SyncRange{ + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, must_exit).await?; + if rc.found_limit.is_none() { + return Ok(rc); + } + } + Err(Error::Message(format!("Unable to compute root checksum (this should never happen"))) + } + + fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> { + async move { + let mut cache = self.cache[range.level].lock().await; + if let Some(v) = cache.get(&range) { + if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { + return Ok(v.clone()); } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - return Ok(()) + } + cache.remove(&range); + drop(cache); + + let v = self.range_checksum_inner(&range, must_exit).await?; + + let mut cache = self.cache[range.level].lock().await; + eprintln!("Checksum for {:?}: {:?}", range, v); + cache.insert(range.clone(), v.clone()); + Ok(v) + }.boxed() + } + + async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + if range.level == 1 { + let mut children = vec![]; + for item in self.table.store.range(range.begin.clone()..range.end.clone()) { + let (key, value) = item?; + let key_hash = hash(&key[..]); + if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: Some(key.to_vec()), + time: Instant::now(), + }) + } + let item_range = SyncRange{ + begin: key.to_vec(), + end: vec![], + level: 0, + }; + children.push((item_range, hash(&value[..]))); + } + Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: None, + time: Instant::now(), + }) + } else { + let mut children = vec![]; + let mut sub_range = SyncRange{ + begin: range.begin.clone(), + end: range.end.clone(), + level: range.level - 1, + }; + let mut time = Instant::now(); + while !*must_exit.borrow() { + let sub_ck = self.range_checksum(&sub_range, must_exit).await?; + + if sub_ck.children.len() > 0 { + let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]); + children.push((sub_range.clone(), sub_ck_hash)); + if sub_ck.time < time { + time = sub_ck.time; } } + + if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: None, + time, + }); + } + let found_limit = sub_ck.found_limit.unwrap(); + + let actual_limit_hash = hash(&found_limit[..]); + if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { + return Ok(RangeChecksum{ + bounds: range.clone(), + children, + found_limit: Some(found_limit.clone()), + time, + }); + } + + sub_range.begin = found_limit; } + Err(Error::Message(format!("Exiting."))) } } - async fn pop_task(&self) -> Option<Partition> { - self.todo.lock().await.pop_task() - } + async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + let mut todo = VecDeque::new(); + todo.push_back(root_ck); - async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> { - eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); + while !todo.is_empty() && !*must_exit.borrow() { + let end = std::cmp::min(16, todo.len()); + let step = todo.drain(..end).collect::<Vec<_>>(); + unimplemented!() + } Ok(()) } + + pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { + let mut ret = vec![]; + for ckr in checksums.iter() { + let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; + for (range, hash) in ckr.children.iter() { + match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { + Err(_) => { + ret.push(range.clone()); + } + Ok(i) => { + if our_ckr.children[i].1 != *hash { + ret.push(range.clone()); + } + } + } + } + } + Ok(ret) + } } impl SyncTodo { |