aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs78
1 files changed, 41 insertions, 37 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 8eb08074..5ef13d6d 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -27,6 +27,12 @@ pub struct TableSyncer<F: TableSchema> {
pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
+#[derive(Serialize, Deserialize)]
+pub enum SyncRPC {
+ Checksums(Vec<RangeChecksum>),
+ DifferentSet(Vec<SyncRange>),
+}
+
pub struct SyncTodo {
pub todo: Vec<Partition>,
}
@@ -166,13 +172,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
- let nodes = self
- .table
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&partition.begin, self.table.param.replication_factor);
+ let ring = self.table.system.ring.borrow().clone();
+ let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor);
let mut sync_futures = nodes
.iter()
.map(|node| {
@@ -361,9 +362,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let rpc_resp = self
.table
- .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step))
+ .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)))
.await?;
- if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
+ if let TableRPC::<F>::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
if differing.level == 0 {
@@ -381,7 +382,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
} else {
return Err(Error::Message(format!(
- "Unexpected response to RPC SyncChecksums: {}",
+ "Unexpected response to sync RPC checksums: {}",
debug_serialize(&rpc_resp)
)));
}
@@ -417,41 +418,44 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
- pub async fn handle_checksum_rpc(
+ pub async fn handle_rpc(
self: &Arc<Self>,
- checksums: &[RangeChecksum],
+ message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<Vec<SyncRange>, Error> {
- let mut ret = vec![];
- for ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
- for (range, hash) in ckr.children.iter() {
- match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
- {
- Err(_) => {
- ret.push(range.clone());
- }
- Ok(i) => {
- if our_ckr.children[i].1 != *hash {
+ ) -> Result<SyncRPC, Error> {
+ if let SyncRPC::Checksums(checksums) = message {
+ let mut ret = vec![];
+ for ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
+ for (range, hash) in ckr.children.iter() {
+ match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
+ Err(_) => {
ret.push(range.clone());
}
+ Ok(i) => {
+ if our_ckr.children[i].1 != *hash {
+ ret.push(range.clone());
+ }
+ }
}
}
}
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different out of {}",
+ self.table.name,
+ ret.len(),
+ n_checksums
+ );
+ return Ok(SyncRPC::DifferentSet(ret));
}
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- eprintln!(
- "({}) Checksum comparison RPC: {} different out of {}",
- self.table.name,
- ret.len(),
- n_checksums
- );
- Ok(ret)
+ Err(Error::Message(format!("Unexpected sync RPC")))
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {