aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs203
1 files changed, 153 insertions, 50 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index c1d3bea8..8eb08074 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,16 +1,16 @@
use rand::Rng;
-use std::collections::{BTreeSet, BTreeMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
-use futures::{pin_mut, select};
use futures::future::BoxFuture;
-use futures_util::stream::*;
+use futures::{pin_mut, select};
use futures_util::future::*;
+use futures_util::stream::*;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use serde::{Serialize, Deserialize};
-use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
@@ -62,7 +62,7 @@ pub struct RangeChecksum {
pub children: Vec<(SyncRange, Hash)>,
pub found_limit: Option<Vec<u8>>,
- #[serde(skip, default="std::time::Instant::now")]
+ #[serde(skip, default = "std::time::Instant::now")]
pub time: Instant,
}
@@ -72,7 +72,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(),
+ cache: (0..MAX_DEPTH)
+ .map(|_| Mutex::new(BTreeMap::new()))
+ .collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@@ -137,9 +139,15 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(partition) = self.todo.lock().await.pop_task() {
- let res = self.clone().sync_partition(&partition, &mut must_exit).await;
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
if let Err(e) = res {
- eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e);
+ eprintln!(
+ "({}) Error while syncing {:?}: {}",
+ self.table.name, partition, e
+ );
}
} else {
tokio::time::delay_for(Duration::from_secs(1)).await;
@@ -148,13 +156,29 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
Ok(())
}
- async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &Partition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
+ let root_cks = self
+ .root_checksum(&partition.begin, &partition.end, must_exit)
+ .await?;
- let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
- let mut sync_futures = nodes.iter()
- .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()))
+ let nodes = self
+ .table
+ .system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&partition.begin, self.table.param.replication_factor);
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())
+ })
.collect::<FuturesUnordered<_>>();
while let Some(r) = sync_futures.next().await {
@@ -163,27 +187,45 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
if !partition.retain {
- self.table.delete_range(&partition.begin, &partition.end).await?;
+ self.table
+ .delete_range(&partition.begin, &partition.end)
+ .await?;
}
Ok(())
}
- async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ async fn root_checksum(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(&SyncRange{
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- }, must_exit).await?;
+ let rc = self
+ .range_checksum(
+ &SyncRange {
+ begin: begin.to_vec(),
+ end: end.to_vec(),
+ level: i,
+ },
+ must_exit,
+ )
+ .await?;
if rc.found_limit.is_none() {
return Ok(rc);
}
}
- Err(Error::Message(format!("Unable to compute root checksum (this should never happen")))
+ Err(Error::Message(format!(
+ "Unable to compute root checksum (this should never happen"
+ )))
}
- fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
+ fn range_checksum<'a>(
+ self: &'a Arc<Self>,
+ range: &'a SyncRange,
+ must_exit: &'a mut watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
async move {
let mut cache = self.cache[range.level].lock().await;
if let Some(v) = cache.get(&range) {
@@ -195,41 +237,53 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
drop(cache);
let v = self.range_checksum_inner(&range, must_exit).await?;
- eprintln!("({}) New checksum calculated for {}-{}/{}, {} children",
+ eprintln!(
+ "({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin[..]),
hex::encode(&range.end[..]),
range.level,
- v.children.len());
+ v.children.len()
+ );
let mut cache = self.cache[range.level].lock().await;
cache.insert(range.clone(), v.clone());
Ok(v)
- }.boxed()
+ }
+ .boxed()
}
- async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ async fn range_checksum_inner(
+ self: &Arc<Self>,
+ range: &SyncRange,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
if range.level == 1 {
let mut children = vec![];
- for item in self.table.store.range(range.begin.clone()..range.end.clone()) {
+ for item in self
+ .table
+ .store
+ .range(range.begin.clone()..range.end.clone())
+ {
let (key, value) = item?;
let key_hash = hash(&key[..]);
- if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
- return Ok(RangeChecksum{
+ if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0)
+ {
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: Some(key.to_vec()),
time: Instant::now(),
- })
+ });
}
- let item_range = SyncRange{
+ let item_range = SyncRange {
begin: key.to_vec(),
end: vec![],
level: 0,
};
children.push((item_range, hash(&value[..])));
}
- Ok(RangeChecksum{
+ Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: None,
@@ -237,7 +291,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
})
} else {
let mut children = vec![];
- let mut sub_range = SyncRange{
+ let mut sub_range = SyncRange {
begin: range.begin.clone(),
end: range.end.clone(),
level: range.level - 1,
@@ -255,7 +309,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 {
- return Ok(RangeChecksum{
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: None,
@@ -265,8 +319,11 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let found_limit = sub_ck.found_limit.unwrap();
let actual_limit_hash = hash(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
- return Ok(RangeChecksum{
+ if actual_limit_hash.as_slice()[0..range.level]
+ .iter()
+ .all(|x| *x == 0)
+ {
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: Some(found_limit.clone()),
@@ -280,18 +337,32 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
- async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ async fn do_sync_with(
+ self: Arc<Self>,
+ root_ck: RangeChecksum,
+ who: UUID,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
let mut todo = VecDeque::new();
todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() {
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children);
+ eprintln!(
+ "({}) Sync with {:?}: {} ({}) remaining",
+ self.table.name,
+ who,
+ todo.len(),
+ total_children
+ );
let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>();
- let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?;
+ let rpc_resp = self
+ .table
+ .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step))
+ .await?;
if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
@@ -303,17 +374,28 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
if items.len() > 0 {
- self.table.system.background.spawn(self.clone().send_items(who.clone(), items));
+ self.table
+ .system
+ .background
+ .spawn(self.clone().send_items(who.clone(), items));
}
} else {
- return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp))));
+ return Err(Error::Message(format!(
+ "Unexpected response to RPC SyncChecksums: {}",
+ debug_serialize(&rpc_resp)
+ )));
}
}
Ok(())
}
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who);
+ eprintln!(
+ "({}) Sending {} items to {:?}",
+ self.table.name,
+ item_list.len(),
+ who
+ );
let mut values = vec![];
for item in item_list.iter() {
@@ -321,20 +403,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
- let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?;
+ let rpc_resp = self
+ .table
+ .rpc_call(&who, &TableRPC::<F>::Update(values))
+ .await?;
if let TableRPC::<F>::Ok = rpc_resp {
Ok(())
} else {
- Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp))))
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
}
}
- pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
+ pub async fn handle_checksum_rpc(
+ self: &Arc<Self>,
+ checksums: &[RangeChecksum],
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<Vec<SyncRange>, Error> {
let mut ret = vec![];
for ckr in checksums.iter() {
let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
for (range, hash) in ckr.children.iter() {
- match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) {
+ match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
Err(_) => {
ret.push(range.clone());
}
@@ -346,14 +441,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
}
- let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums);
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different out of {}",
+ self.table.name,
+ ret.len(),
+ n_checksums
+ );
Ok(ret)
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
for i in 1..MAX_DEPTH {
- let needle = SyncRange{
+ let needle = SyncRange {
begin: item_key.to_vec(),
end: vec![],
level: i,