diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 75 |
1 files changed, 66 insertions, 9 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 5097c1b0..039dab6d 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,7 +1,7 @@ use rand::Rng; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; -use std::collections::BTreeSet; use futures::{pin_mut, select}; use futures_util::future::*; @@ -10,7 +10,7 @@ use tokio::sync::Mutex; use crate::data::*; use crate::error::Error; -use crate::membership::{Ring, System}; +use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); @@ -29,6 +29,7 @@ pub struct SyncTodo { pub struct Partition { pub begin: Hash, pub end: Hash, + pub retain: bool, } impl<F: TableSchema + 'static> TableSyncer<F> { @@ -124,7 +125,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> { - unimplemented!() + eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); + Ok(()) } } @@ -135,14 +137,17 @@ impl SyncTodo { self.todo.clear(); let ring: Arc<Ring> = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = ring.ring[i].location.clone(); + if i == 0 { + self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + } + if i == ring.ring.len() - 1 { - let end = ring.ring[0].location.clone(); self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); } else { let end = ring.ring[i + 1].location.clone(); self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); @@ -158,23 +163,75 @@ impl SyncTodo { nodes: &[UUID], my_id: &UUID, ) { - if !nodes.contains(my_id) { + let retain = nodes.contains(my_id); + if !retain { // Check if we have some data to send, otherwise skip if table .store .range(begin.clone()..end.clone()) .next() .is_none() - {} + { + return; + } } - self.todo.push(Partition { begin, end }); + self.todo.push(Partition { begin, end, retain }); } fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { + let my_id = table.system.id.clone(); + let old_ring = ring_points(old); let new_ring = ring_points(new); - unimplemented!() + let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>(); + + let prev_todo_begin = self + .todo + .iter() + .map(|x| x.begin.clone()) + .collect::<BTreeSet<_>>(); + let prev_todo_end = self + .todo + .iter() + .map(|x| x.end.clone()) + .collect::<BTreeSet<_>>(); + let prev_todo = prev_todo_begin + .union(&prev_todo_end) + .cloned() + .collect::<BTreeSet<_>>(); + + let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>(); + + self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { + let begin = all_points[i].clone(); + let end = all_points[i + 1].clone(); + let was_ours = old + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let is_ours = new + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + Ok(_) => true, + Err(j) => { + (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) + || (j < self.todo.len() + && self.todo[j].begin < end && begin < self.todo[j].end) + } + }; + if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { + new_todo.push(Partition { + begin, + end, + retain: is_ours, + }); + } + } + + self.todo = new_todo; } fn pop_task(&mut self) -> Option<Partition> { |