aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs171
1 files changed, 103 insertions, 68 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 703dd750..024e239f 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -29,6 +29,8 @@ pub struct TableSyncer<F: TableSchema> {
#[derive(Serialize, Deserialize)]
pub enum SyncRPC {
+ GetRootChecksumRange(Hash, Hash),
+ RootChecksumRange(SyncRange),
Checksums(Vec<RangeChecksum>, bool),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
@@ -180,6 +182,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.filter(|node| **node != my_id)
.map(|node| {
self.clone().do_sync_with(
+ partition.clone(),
root_cks.clone(),
node.clone(),
partition.retain,
@@ -346,13 +349,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
async fn do_sync_with(
self: Arc<Self>,
+ partition: Partition,
root_ck: RangeChecksum,
who: UUID,
retain: bool,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut todo = VecDeque::new();
- todo.push_back(root_ck);
+
+ // If their root checksum has level > than us, use that as a reference
+ let root_cks_resp = self
+ .table
+ .rpc_call(
+ &who,
+ &TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
+ partition.begin.clone(),
+ partition.end.clone(),
+ )),
+ )
+ .await?;
+ if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
+ if range.level > root_ck.bounds.level {
+ let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?;
+ todo.push_back(their_root_range_ck);
+ } else {
+ todo.push_back(root_ck);
+ }
+ }
while !todo.is_empty() && !*must_exit.borrow() {
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
@@ -445,83 +468,95 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
- if let SyncRPC::Checksums(checksums, retain) = message {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
- for ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
- for (range, hash) in ckr.children.iter() {
- // Only consider items that are in the intersection of the two ranges
- // (other ranges will be exchanged at some point)
- if our_ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
- }
+ match message {
+ SyncRPC::GetRootChecksumRange(begin, end) => {
+ let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?;
+ Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
+ }
+ SyncRPC::Checksums(checksums, retain) => {
+ self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit)
+ .await
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
- {
- Err(_) => true,
- Ok(i) => our_ckr.children[i].1 != *hash,
- };
- if differs {
- ret_ranges.push(range.clone());
- if *retain && range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
+ pub async fn handle_checksums_rpc(
+ self: &Arc<Self>,
+ checksums: &[RangeChecksum],
+ retain: bool,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<SyncRPC, Error> {
+ let mut ret_ranges = vec![];
+ let mut ret_items = vec![];
+ for ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?;
+ for (range, hash) in ckr.children.iter() {
+ // Only consider items that are in the intersection of the two ranges
+ // (other ranges will be exchanged at some point)
+ if our_ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
+ break;
+ }
+
+ let differs = match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
+ Err(_) => true,
+ Ok(i) => our_ckr.children[i].1 != *hash,
+ };
+ if differs {
+ ret_ranges.push(range.clone());
+ if retain && range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
- for (range, _hash) in our_ckr.children.iter() {
- if ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
- }
+ }
+ for (range, _hash) in our_ckr.children.iter() {
+ if ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
+ break;
+ }
- let not_present = ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
- .is_err();
- if not_present {
- if range.level > 0 {
- ret_ranges.push(range.clone());
- }
- if *retain && range.level == 0 {
- if let Some(item_bytes) =
- self.table.store.get(range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
+ let not_present = ckr
+ .children
+ .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
+ .is_err();
+ if not_present {
+ if range.level > 0 {
+ ret_ranges.push(range.clone());
+ }
+ if retain && range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
}
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- eprintln!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.table.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- return Ok(SyncRPC::Difference(ret_ranges, ret_items));
}
- Err(Error::Message(format!("Unexpected sync RPC")))
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different + {} items for {} received",
+ self.table.name,
+ ret_ranges.len(),
+ ret_items.len(),
+ n_checksums
+ );
+ Ok(SyncRPC::Difference(ret_ranges, ret_items))
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {