diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 197 |
1 files changed, 86 insertions, 111 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 3ba2fc6a..b4555a77 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,5 +1,5 @@ use rand::Rng; -use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -21,10 +21,12 @@ const MAX_DEPTH: usize = 16; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); -pub struct TableSyncer<F: TableSchema> { - pub table: Arc<Table<F>>, - pub todo: Mutex<SyncTodo>, - pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct TableSyncer<F: TableSchema, R: TableReplication> { + table: Arc<Table<F, R>>, + todo: Mutex<SyncTodo>, + cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, } #[derive(Serialize, Deserialize)] @@ -36,21 +38,21 @@ pub enum SyncRPC { } pub struct SyncTodo { - pub todo: Vec<Partition>, + todo: Vec<TodoPartition>, } #[derive(Debug, Clone)] -pub struct Partition { - pub begin: Hash, - pub end: Hash, - pub retain: bool, +struct TodoPartition { + begin: Hash, + end: Hash, + retain: bool, } #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] pub struct SyncRange { - pub begin: Vec<u8>, - pub end: Vec<u8>, - pub level: usize, + begin: Vec<u8>, + end: Vec<u8>, + level: usize, } impl std::cmp::PartialOrd for SyncRange { @@ -66,16 +68,20 @@ impl std::cmp::Ord for SyncRange { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RangeChecksum { - pub bounds: SyncRange, - pub children: Vec<(SyncRange, Hash)>, - pub found_limit: Option<Vec<u8>>, + bounds: SyncRange, + children: Vec<(SyncRange, Hash)>, + found_limit: Option<Vec<u8>>, #[serde(skip, default = "std::time::Instant::now")] - pub time: Instant, + time: Instant, } -impl<F: TableSchema + 'static> TableSyncer<F> { - pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> { +impl<F, R> TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -166,7 +172,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { async fn sync_partition( self: Arc<Self>, - partition: &Partition, + partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); @@ -175,8 +181,10 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .await?; let my_id = self.table.system.id.clone(); - let ring = self.table.system.ring.borrow().clone(); - let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); + let nodes = self + .table + .replication + .write_nodes(&partition.begin, &self.table.system); let mut sync_futures = nodes .iter() .filter(|node| **node != my_id) @@ -349,7 +357,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { async fn do_sync_with( self: Arc<Self>, - partition: Partition, + partition: TodoPartition, root_ck: RangeChecksum, who: UUID, retain: bool, @@ -367,7 +375,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { partition.begin.clone(), partition.end.clone(), )), - self.table.param.timeout, + TABLE_SYNC_RPC_TIMEOUT, ) .await?; if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { @@ -398,7 +406,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .call( &who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), - self.table.param.timeout, + TABLE_SYNC_RPC_TIMEOUT, ) .await?; if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = @@ -456,11 +464,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let rpc_resp = self .table .rpc_client - .call( - &who, - &TableRPC::<F>::Update(values), - self.table.param.timeout, - ) + .call(&who, &TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) .await?; if let TableRPC::<F>::Ok = rpc_resp { Ok(()) @@ -490,7 +494,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - pub async fn handle_checksums_rpc( + async fn handle_checksums_rpc( self: &Arc<Self>, checksums: &[RangeChecksum], retain: bool, @@ -589,99 +593,80 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } impl SyncTodo { - fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) { + fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { let my_id = table.system.id.clone(); self.todo.clear(); - let ring: Arc<Ring> = table.system.ring.borrow().clone(); - - for i in 0..ring.ring.len() { - let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); - let begin = ring.ring[i].location.clone(); - - if i == 0 { - self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + let ring = table.system.ring.borrow().clone(); + let split_points = table.replication.split_points(&ring); + + for i in 0..split_points.len() - 1 { + let begin = split_points[i].clone(); + let end = split_points[i + 1].clone(); + let nodes = table.replication.write_nodes_from_ring(&begin, &ring); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if table + .store + .range(begin.clone()..end.clone()) + .next() + .is_none() + { + continue; + } } - if i == ring.ring.len() - 1 { - self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - } else { - let end = ring.ring[i + 1].location.clone(); - self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); - } + self.todo.push(TodoPartition { begin, end, retain }); } } - fn add_full_scan_aux<F: TableSchema>( + fn add_ring_difference<F: TableSchema, R: TableReplication>( &mut self, - table: &Table<F>, - begin: Hash, - end: Hash, - nodes: &[UUID], - my_id: &UUID, + table: &Table<F, R>, + old_ring: &Ring, + new_ring: &Ring, ) { - let retain = nodes.contains(my_id); - if !retain { - // Check if we have some data to send, otherwise skip - if table - .store - .range(begin.clone()..end.clone()) - .next() - .is_none() - { - return; - } - } - - self.todo.push(Partition { begin, end, retain }); - } - - fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { let my_id = table.system.id.clone(); - let old_ring = ring_points(old); - let new_ring = ring_points(new); - let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>(); - - let prev_todo_begin = self - .todo - .iter() - .map(|x| x.begin.clone()) - .collect::<BTreeSet<_>>(); - let prev_todo_end = self - .todo - .iter() - .map(|x| x.end.clone()) - .collect::<BTreeSet<_>>(); - let prev_todo = prev_todo_begin - .union(&prev_todo_end) - .cloned() - .collect::<BTreeSet<_>>(); - - let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>(); - - self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut all_points = None + .into_iter() + .chain(table.replication.split_points(old_ring).drain(..)) + .chain(table.replication.split_points(new_ring).drain(..)) + .chain(self.todo.iter().map(|x| x.begin.clone())) + .chain(self.todo.iter().map(|x| x.end.clone())) + .collect::<Vec<_>>(); + all_points.sort(); + all_points.dedup(); + + let mut old_todo = std::mem::replace(&mut self.todo, vec![]); + old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { let begin = all_points[i].clone(); let end = all_points[i + 1].clone(); - let was_ours = old - .walk_ring(&begin, table.param.replication_factor) + let was_ours = table + .replication + .write_nodes_from_ring(&begin, &old_ring) .contains(&my_id); - let is_ours = new - .walk_ring(&begin, table.param.replication_factor) + let is_ours = table + .replication + .write_nodes_from_ring(&begin, &new_ring) .contains(&my_id); - let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + + let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { Ok(_) => true, Err(j) => { - (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) - || (j < self.todo.len() - && self.todo[j].begin < end && begin < self.todo[j].end) + (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) + || (j < old_todo.len() + && old_todo[j].begin < end && begin < old_todo[j].end) } }; if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { - new_todo.push(Partition { + new_todo.push(TodoPartition { begin, end, retain: is_ours, @@ -692,7 +677,7 @@ impl SyncTodo { self.todo = new_todo; } - fn pop_task(&mut self) -> Option<Partition> { + fn pop_task(&mut self) -> Option<TodoPartition> { if self.todo.is_empty() { return None; } @@ -707,13 +692,3 @@ impl SyncTodo { } } } - -fn ring_points(ring: &Ring) -> BTreeSet<Hash> { - let mut ret = BTreeSet::new(); - ret.insert([0u8; 32].into()); - ret.insert([0xFFu8; 32].into()); - for i in 0..ring.ring.len() { - ret.insert(ring.ring[i].location.clone()); - } - ret -} |