aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs75
1 files changed, 66 insertions, 9 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 5097c1b0..039dab6d 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,7 +1,7 @@
use rand::Rng;
+use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
-use std::collections::BTreeSet;
use futures::{pin_mut, select};
use futures_util::future::*;
@@ -10,7 +10,7 @@ use tokio::sync::Mutex;
use crate::data::*;
use crate::error::Error;
-use crate::membership::{Ring, System};
+use crate::membership::Ring;
use crate::table::*;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
@@ -29,6 +29,7 @@ pub struct SyncTodo {
pub struct Partition {
pub begin: Hash,
pub end: Hash,
+ pub retain: bool,
}
impl<F: TableSchema + 'static> TableSyncer<F> {
@@ -124,7 +125,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
- unimplemented!()
+ eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition);
+ Ok(())
}
}
@@ -135,14 +137,17 @@ impl SyncTodo {
self.todo.clear();
let ring: Arc<Ring> = table.system.ring.borrow().clone();
+
for i in 0..ring.ring.len() {
let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
let begin = ring.ring[i].location.clone();
+ if i == 0 {
+ self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id);
+ }
+
if i == ring.ring.len() - 1 {
- let end = ring.ring[0].location.clone();
self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
- self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
} else {
let end = ring.ring[i + 1].location.clone();
self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
@@ -158,23 +163,75 @@ impl SyncTodo {
nodes: &[UUID],
my_id: &UUID,
) {
- if !nodes.contains(my_id) {
+ let retain = nodes.contains(my_id);
+ if !retain {
// Check if we have some data to send, otherwise skip
if table
.store
.range(begin.clone()..end.clone())
.next()
.is_none()
- {}
+ {
+ return;
+ }
}
- self.todo.push(Partition { begin, end });
+ self.todo.push(Partition { begin, end, retain });
}
fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
+ let my_id = table.system.id.clone();
+
let old_ring = ring_points(old);
let new_ring = ring_points(new);
- unimplemented!()
+ let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>();
+
+ let prev_todo_begin = self
+ .todo
+ .iter()
+ .map(|x| x.begin.clone())
+ .collect::<BTreeSet<_>>();
+ let prev_todo_end = self
+ .todo
+ .iter()
+ .map(|x| x.end.clone())
+ .collect::<BTreeSet<_>>();
+ let prev_todo = prev_todo_begin
+ .union(&prev_todo_end)
+ .cloned()
+ .collect::<BTreeSet<_>>();
+
+ let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>();
+
+ self.todo.sort_by(|x, y| x.begin.cmp(&y.begin));
+ let mut new_todo = vec![];
+ for i in 0..all_points.len() - 1 {
+ let begin = all_points[i].clone();
+ let end = all_points[i + 1].clone();
+ let was_ours = old
+ .walk_ring(&begin, table.param.replication_factor)
+ .contains(&my_id);
+ let is_ours = new
+ .walk_ring(&begin, table.param.replication_factor)
+ .contains(&my_id);
+ let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) {
+ Ok(_) => true,
+ Err(j) => {
+ (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end)
+ || (j < self.todo.len()
+ && self.todo[j].begin < end && begin < self.todo[j].end)
+ }
+ };
+ if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
+ new_todo.push(Partition {
+ begin,
+ end,
+ retain: is_ours,
+ });
+ }
+ }
+
+ self.todo = new_todo;
}
fn pop_task(&mut self) -> Option<Partition> {