aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs62
1 files changed, 47 insertions, 15 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index f96e45ff..703dd750 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -29,7 +29,7 @@ pub struct TableSyncer<F: TableSchema> {
#[derive(Serialize, Deserialize)]
pub enum SyncRPC {
- Checksums(Vec<RangeChecksum>),
+ Checksums(Vec<RangeChecksum>, bool),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
@@ -179,8 +179,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.iter()
.filter(|node| **node != my_id)
.map(|node| {
- self.clone()
- .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())
+ self.clone().do_sync_with(
+ root_cks.clone(),
+ node.clone(),
+ partition.retain,
+ must_exit.clone(),
+ )
})
.collect::<FuturesUnordered<_>>();
@@ -344,6 +348,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
self: Arc<Self>,
root_ck: RangeChecksum,
who: UUID,
+ retain: bool,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let mut todo = VecDeque::new();
@@ -364,10 +369,21 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let rpc_resp = self
.table
- .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)))
+ .rpc_call(
+ &who,
+ &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
+ )
.await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp {
- eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len());
+ if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
+ rpc_resp
+ {
+ eprintln!(
+ "({}) Sync with {:?}: difference {} ranges, {} items",
+ self.table.name,
+ who,
+ diff_ranges.len(),
+ diff_items.len()
+ );
let mut items_to_send = vec![];
for differing in diff_ranges.drain(..) {
if differing.level == 0 {
@@ -377,7 +393,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
todo.push_back(checksum);
}
}
- if diff_items.len() > 0 {
+ if retain && diff_items.len() > 0 {
self.table.handle_update(diff_items).await?;
}
if items_to_send.len() > 0 {
@@ -429,7 +445,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
- if let SyncRPC::Checksums(checksums) = message {
+ if let SyncRPC::Checksums(checksums, retain) = message {
let mut ret_ranges = vec![];
let mut ret_items = vec![];
for ckr in checksums.iter() {
@@ -437,7 +453,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
for (range, hash) in ckr.children.iter() {
// Only consider items that are in the intersection of the two ranges
// (other ranges will be exchanged at some point)
- if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
+ if our_ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
break;
}
@@ -450,15 +471,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
};
if differs {
ret_ranges.push(range.clone());
- if range.level == 0 {
- if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ if *retain && range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(range.begin.as_slice())?
+ {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
}
for (range, _hash) in our_ckr.children.iter() {
- if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
+ if ckr
+ .found_limit
+ .as_ref()
+ .map(|x| range.begin.as_slice() >= x.as_slice())
+ .unwrap_or(false)
+ {
break;
}
@@ -467,9 +495,13 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
.is_err();
if not_present {
- ret_ranges.push(range.clone());
- if range.level == 0 {
- if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ if range.level > 0 {
+ ret_ranges.push(range.clone());
+ }
+ if *retain && range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(range.begin.as_slice())?
+ {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}