From db1c4222cefa99c6a4453da13bdb4f206b4b05a5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 18:51:29 +0200 Subject: Don't send items... ...if syncer doesn't need them because he's going to delete the partition anyway. Also, fix block resync queue --- src/table_sync.rs | 62 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 15 deletions(-) (limited to 'src/table_sync.rs') diff --git a/src/table_sync.rs b/src/table_sync.rs index f96e45ff..703dd750 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -29,7 +29,7 @@ pub struct TableSyncer { #[derive(Serialize, Deserialize)] pub enum SyncRPC { - Checksums(Vec), + Checksums(Vec, bool), Difference(Vec, Vec>), } @@ -179,8 +179,12 @@ impl TableSyncer { .iter() .filter(|node| **node != my_id) .map(|node| { - self.clone() - .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + self.clone().do_sync_with( + root_cks.clone(), + node.clone(), + partition.retain, + must_exit.clone(), + ) }) .collect::>(); @@ -344,6 +348,7 @@ impl TableSyncer { self: Arc, root_ck: RangeChecksum, who: UUID, + retain: bool, mut must_exit: watch::Receiver, ) -> Result<(), Error> { let mut todo = VecDeque::new(); @@ -364,10 +369,21 @@ impl TableSyncer { let rpc_resp = self .table - .rpc_call(&who, &TableRPC::::SyncRPC(SyncRPC::Checksums(step))) + .rpc_call( + &who, + &TableRPC::::SyncRPC(SyncRPC::Checksums(step, retain)), + ) .await?; - if let TableRPC::::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp { - eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len()); + if let TableRPC::::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = + rpc_resp + { + eprintln!( + "({}) Sync with {:?}: difference {} ranges, {} items", + self.table.name, + who, + diff_ranges.len(), + diff_items.len() + ); let mut items_to_send = vec![]; for differing in diff_ranges.drain(..) { if differing.level == 0 { @@ -377,7 +393,7 @@ impl TableSyncer { todo.push_back(checksum); } } - if diff_items.len() > 0 { + if retain && diff_items.len() > 0 { self.table.handle_update(diff_items).await?; } if items_to_send.len() > 0 { @@ -429,7 +445,7 @@ impl TableSyncer { message: &SyncRPC, mut must_exit: watch::Receiver, ) -> Result { - if let SyncRPC::Checksums(checksums) = message { + if let SyncRPC::Checksums(checksums, retain) = message { let mut ret_ranges = vec![]; let mut ret_items = vec![]; for ckr in checksums.iter() { @@ -437,7 +453,12 @@ impl TableSyncer { for (range, hash) in ckr.children.iter() { // Only consider items that are in the intersection of the two ranges // (other ranges will be exchanged at some point) - if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if our_ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -450,15 +471,22 @@ impl TableSyncer { }; if differs { ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } } for (range, _hash) in our_ckr.children.iter() { - if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -467,9 +495,13 @@ impl TableSyncer { .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) .is_err(); if not_present { - ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if range.level > 0 { + ret_ranges.push(range.clone()); + } + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } -- cgit v1.2.3