From e8d750175de3daff0876b63c9ae4dcbd047be793 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Apr 2020 17:04:28 +0200 Subject: Implement ring comparison algorithm --- src/table_sync.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 9 deletions(-) (limited to 'src/table_sync.rs') diff --git a/src/table_sync.rs b/src/table_sync.rs index 5097c1b0..039dab6d 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,7 +1,7 @@ use rand::Rng; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; -use std::collections::BTreeSet; use futures::{pin_mut, select}; use futures_util::future::*; @@ -10,7 +10,7 @@ use tokio::sync::Mutex; use crate::data::*; use crate::error::Error; -use crate::membership::{Ring, System}; +use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); @@ -29,6 +29,7 @@ pub struct SyncTodo { pub struct Partition { pub begin: Hash, pub end: Hash, + pub retain: bool, } impl TableSyncer { @@ -124,7 +125,8 @@ impl TableSyncer { } async fn sync_partition(self: &Arc, partition: &Partition) -> Result<(), Error> { - unimplemented!() + eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); + Ok(()) } } @@ -135,14 +137,17 @@ impl SyncTodo { self.todo.clear(); let ring: Arc = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = ring.ring[i].location.clone(); + if i == 0 { + self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + } + if i == ring.ring.len() - 1 { - let end = ring.ring[0].location.clone(); self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); } else { let end = ring.ring[i + 1].location.clone(); self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); @@ -158,23 +163,75 @@ impl SyncTodo { nodes: &[UUID], my_id: &UUID, ) { - if !nodes.contains(my_id) { + let retain = nodes.contains(my_id); + if !retain { // Check if we have some data to send, otherwise skip if table .store .range(begin.clone()..end.clone()) .next() .is_none() - {} + { + return; + } } - self.todo.push(Partition { begin, end }); + self.todo.push(Partition { begin, end, retain }); } fn add_ring_difference(&mut self, table: &Table, old: &Ring, new: &Ring) { + let my_id = table.system.id.clone(); + let old_ring = ring_points(old); let new_ring = ring_points(new); - unimplemented!() + let both_ring = old_ring.union(&new_ring).cloned().collect::>(); + + let prev_todo_begin = self + .todo + .iter() + .map(|x| x.begin.clone()) + .collect::>(); + let prev_todo_end = self + .todo + .iter() + .map(|x| x.end.clone()) + .collect::>(); + let prev_todo = prev_todo_begin + .union(&prev_todo_end) + .cloned() + .collect::>(); + + let all_points = both_ring.union(&prev_todo).cloned().collect::>(); + + self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { + let begin = all_points[i].clone(); + let end = all_points[i + 1].clone(); + let was_ours = old + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let is_ours = new + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + Ok(_) => true, + Err(j) => { + (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) + || (j < self.todo.len() + && self.todo[j].begin < end && begin < self.todo[j].end) + } + }; + if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { + new_todo.push(Partition { + begin, + end, + retain: is_ours, + }); + } + } + + self.todo = new_todo; } fn pop_task(&mut self) -> Option { -- cgit v1.2.3