diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 203 |
1 files changed, 153 insertions, 50 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index c1d3bea8..8eb08074 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,16 +1,16 @@ use rand::Rng; -use std::collections::{BTreeSet, BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; use futures::future::BoxFuture; -use futures_util::stream::*; +use futures::{pin_mut, select}; use futures_util::future::*; +use futures_util::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; use tokio::sync::watch; use tokio::sync::Mutex; -use serde::{Serialize, Deserialize}; -use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; @@ -62,7 +62,7 @@ pub struct RangeChecksum { pub children: Vec<(SyncRange, Hash)>, pub found_limit: Option<Vec<u8>>, - #[serde(skip, default="std::time::Instant::now")] + #[serde(skip, default = "std::time::Instant::now")] pub time: Instant, } @@ -72,7 +72,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), - cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(), + cache: (0..MAX_DEPTH) + .map(|_| Mutex::new(BTreeMap::new())) + .collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -137,9 +139,15 @@ impl<F: TableSchema + 'static> TableSyncer<F> { ) -> Result<(), Error> { while !*must_exit.borrow() { if let Some(partition) = self.todo.lock().await.pop_task() { - let res = self.clone().sync_partition(&partition, &mut must_exit).await; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; if let Err(e) = res { - eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e); + eprintln!( + "({}) Error while syncing {:?}: {}", + self.table.name, partition, e + ); } } else { tokio::time::delay_for(Duration::from_secs(1)).await; @@ -148,13 +156,29 @@ impl<F: TableSchema + 'static> TableSyncer<F> { Ok(()) } - async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + async fn sync_partition( + self: Arc<Self>, + partition: &Partition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; + let root_cks = self + .root_checksum(&partition.begin, &partition.end, must_exit) + .await?; - let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); - let mut sync_futures = nodes.iter() - .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())) + let nodes = self + .table + .system + .ring + .borrow() + .clone() + .walk_ring(&partition.begin, self.table.param.replication_factor); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + }) .collect::<FuturesUnordered<_>>(); while let Some(r) = sync_futures.next().await { @@ -163,27 +187,45 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if !partition.retain { - self.table.delete_range(&partition.begin, &partition.end).await?; + self.table + .delete_range(&partition.begin, &partition.end) + .await?; } Ok(()) } - async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn root_checksum( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { for i in 1..MAX_DEPTH { - let rc = self.range_checksum(&SyncRange{ - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, must_exit).await?; + let rc = self + .range_checksum( + &SyncRange { + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, + must_exit, + ) + .await?; if rc.found_limit.is_none() { return Ok(rc); } } - Err(Error::Message(format!("Unable to compute root checksum (this should never happen"))) + Err(Error::Message(format!( + "Unable to compute root checksum (this should never happen" + ))) } - fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> { + fn range_checksum<'a>( + self: &'a Arc<Self>, + range: &'a SyncRange, + must_exit: &'a mut watch::Receiver<bool>, + ) -> BoxFuture<'a, Result<RangeChecksum, Error>> { async move { let mut cache = self.cache[range.level].lock().await; if let Some(v) = cache.get(&range) { @@ -195,41 +237,53 @@ impl<F: TableSchema + 'static> TableSyncer<F> { drop(cache); let v = self.range_checksum_inner(&range, must_exit).await?; - eprintln!("({}) New checksum calculated for {}-{}/{}, {} children", + eprintln!( + "({}) New checksum calculated for {}-{}/{}, {} children", self.table.name, hex::encode(&range.begin[..]), hex::encode(&range.end[..]), range.level, - v.children.len()); + v.children.len() + ); let mut cache = self.cache[range.level].lock().await; cache.insert(range.clone(), v.clone()); Ok(v) - }.boxed() + } + .boxed() } - async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn range_checksum_inner( + self: &Arc<Self>, + range: &SyncRange, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { if range.level == 1 { let mut children = vec![]; - for item in self.table.store.range(range.begin.clone()..range.end.clone()) { + for item in self + .table + .store + .range(range.begin.clone()..range.end.clone()) + { let (key, value) = item?; let key_hash = hash(&key[..]); - if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(key.to_vec()), time: Instant::now(), - }) + }); } - let item_range = SyncRange{ + let item_range = SyncRange { begin: key.to_vec(), end: vec![], level: 0, }; children.push((item_range, hash(&value[..]))); } - Ok(RangeChecksum{ + Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -237,7 +291,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { }) } else { let mut children = vec![]; - let mut sub_range = SyncRange{ + let mut sub_range = SyncRange { begin: range.begin.clone(), end: range.end.clone(), level: range.level - 1, @@ -255,7 +309,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { - return Ok(RangeChecksum{ + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -265,8 +319,11 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let found_limit = sub_ck.found_limit.unwrap(); let actual_limit_hash = hash(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if actual_limit_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(found_limit.clone()), @@ -280,18 +337,32 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + async fn do_sync_with( + self: Arc<Self>, + root_ck: RangeChecksum, + who: UUID, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { let mut todo = VecDeque::new(); todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children); + eprintln!( + "({}) Sync with {:?}: {} ({}) remaining", + self.table.name, + who, + todo.len(), + total_children + ); let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)) + .await?; if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { let mut items = vec![]; for differing in s.drain(..) { @@ -303,17 +374,28 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if items.len() > 0 { - self.table.system.background.spawn(self.clone().send_items(who.clone(), items)); + self.table + .system + .background + .spawn(self.clone().send_items(who.clone(), items)); } } else { - return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp)))); + return Err(Error::Message(format!( + "Unexpected response to RPC SyncChecksums: {}", + debug_serialize(&rpc_resp) + ))); } } Ok(()) } async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who); + eprintln!( + "({}) Sending {} items to {:?}", + self.table.name, + item_list.len(), + who + ); let mut values = vec![]; for item in item_list.iter() { @@ -321,20 +403,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::Update(values)) + .await?; if let TableRPC::<F>::Ok = rpc_resp { Ok(()) } else { - Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp)))) + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) } } - pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { + pub async fn handle_checksum_rpc( + self: &Arc<Self>, + checksums: &[RangeChecksum], + mut must_exit: watch::Receiver<bool>, + ) -> Result<Vec<SyncRange>, Error> { let mut ret = vec![]; for ckr in checksums.iter() { let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; for (range, hash) in ckr.children.iter() { - match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { + match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { Err(_) => { ret.push(range.clone()); } @@ -346,14 +441,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } } - let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums); + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different out of {}", + self.table.name, + ret.len(), + n_checksums + ); Ok(ret) } pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { for i in 1..MAX_DEPTH { - let needle = SyncRange{ + let needle = SyncRange { begin: item_key.to_vec(), end: vec![], level: i, |