diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index f96e45ff..703dd750 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -29,7 +29,7 @@ pub struct TableSyncer<F: TableSchema> { #[derive(Serialize, Deserialize)] pub enum SyncRPC { - Checksums(Vec<RangeChecksum>), + Checksums(Vec<RangeChecksum>, bool), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), } @@ -179,8 +179,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .iter() .filter(|node| **node != my_id) .map(|node| { - self.clone() - .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + self.clone().do_sync_with( + root_cks.clone(), + node.clone(), + partition.retain, + must_exit.clone(), + ) }) .collect::<FuturesUnordered<_>>(); @@ -344,6 +348,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, root_ck: RangeChecksum, who: UUID, + retain: bool, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let mut todo = VecDeque::new(); @@ -364,10 +369,21 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let rpc_resp = self .table - .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step))) + .rpc_call( + &who, + &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), + ) .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp { - eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len()); + if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = + rpc_resp + { + eprintln!( + "({}) Sync with {:?}: difference {} ranges, {} items", + self.table.name, + who, + diff_ranges.len(), + diff_items.len() + ); let mut items_to_send = vec![]; for differing in diff_ranges.drain(..) { if differing.level == 0 { @@ -377,7 +393,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { todo.push_back(checksum); } } - if diff_items.len() > 0 { + if retain && diff_items.len() > 0 { self.table.handle_update(diff_items).await?; } if items_to_send.len() > 0 { @@ -429,7 +445,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { message: &SyncRPC, mut must_exit: watch::Receiver<bool>, ) -> Result<SyncRPC, Error> { - if let SyncRPC::Checksums(checksums) = message { + if let SyncRPC::Checksums(checksums, retain) = message { let mut ret_ranges = vec![]; let mut ret_items = vec![]; for ckr in checksums.iter() { @@ -437,7 +453,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> { for (range, hash) in ckr.children.iter() { // Only consider items that are in the intersection of the two ranges // (other ranges will be exchanged at some point) - if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if our_ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -450,15 +471,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> { }; if differs { ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } } for (range, _hash) in our_ckr.children.iter() { - if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -467,9 +495,13 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) .is_err(); if not_present { - ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if range.level > 0 { + ret_ranges.push(range.clone()); + } + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } |