aboutsummaryrefslogtreecommitdiff
path: root/src/table/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table_sync.rs')
-rw-r--r--src/table/table_sync.rs898
1 files changed, 0 insertions, 898 deletions
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
deleted file mode 100644
index 7394be1b..00000000
--- a/src/table/table_sync.rs
+++ /dev/null
@@ -1,898 +0,0 @@
-use rand::Rng;
-use std::collections::{BTreeMap, VecDeque};
-use std::sync::{Arc, Mutex};
-use std::time::{Duration, Instant};
-
-use futures::future::join_all;
-use futures::{pin_mut, select};
-use futures_util::future::*;
-use futures_util::stream::*;
-use serde::{Deserialize, Serialize};
-use serde_bytes::ByteBuf;
-use tokio::sync::{mpsc, watch};
-
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-use garage_util::error::Error;
-
-use crate::*;
-use crate::data::*;
-use crate::replication::*;
-
-const MAX_DEPTH: usize = 16;
-
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Do anti-entropy every 10 minutes
-const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
-
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
-
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<F, R>>,
-
- todo: Mutex<SyncTodo>,
- cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
-}
-
-#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
- GetRootChecksumRange(Hash, Hash),
- RootChecksumRange(SyncRange),
- Checksums(Vec<RangeChecksum>),
- Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
-}
-
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
-#[derive(Debug, Clone)]
-struct TodoPartition {
- // Partition consists in hashes between begin included and end excluded
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
-// A SyncRange defines a query on the dataset stored by a node, in the following way:
-// - all items whose key are >= `begin`
-// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
-// - except if the first item of the range has such many leading zero bytes
-// - and stopping at `end` (excluded) if such an item is not found
-// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
-// i.e. of ranges of level `level-1` that cover the same range
-// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
-// See RangeChecksum for the struct that stores this information.
-#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct SyncRange {
- begin: Vec<u8>,
- end: Vec<u8>,
- level: usize,
-}
-
-impl std::cmp::PartialOrd for SyncRange {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-impl std::cmp::Ord for SyncRange {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.begin
- .cmp(&other.begin)
- .then(self.level.cmp(&other.level))
- .then(self.end.cmp(&other.end))
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct RangeChecksum {
- bounds: SyncRange,
- children: Vec<(SyncRange, Hash)>,
- found_limit: Option<Vec<u8>>,
-
- #[serde(skip, default = "std::time::Instant::now")]
- time: Instant,
-}
-
-#[derive(Debug, Clone)]
-struct RangeChecksumCache {
- hash: Option<Hash>, // None if no children
- found_limit: Option<Vec<u8>>,
- time: Instant,
-}
-
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(data: Arc<TableData<F>>,
- aux: Arc<TableAux<F, R>>) -> Arc<Self> {
- let todo = SyncTodo{ todo: vec![] };
-
- let syncer = Arc::new(Self {
- data: data.clone(),
- aux: aux.clone(),
- todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH)
- .map(|_| Mutex::new(BTreeMap::new()))
- .collect::<Vec<_>>(),
- });
-
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- aux.system.background.spawn_worker(
- format!("table sync watcher for {}", data.name),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- aux.system.background.spawn_worker(
- format!("table syncer for {}", data.name),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan();
- });
-
- syncer
- }
-
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
- select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.data.name);
- self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
- prev_ring = new_ring;
- }
- }
- busy_opt = s_busy => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else {
- if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- }
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.data.name);
- self.add_full_scan();
- }
- }
- }
- }
- Ok(())
- }
-
- pub fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true)?;
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- self.data.name, partition, e
- );
- }
- } else {
- busy_tx.send(false)?;
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- }
- Ok(())
- }
-
- async fn sync_partition(
- self: Arc<Self>,
- partition: &TodoPartition,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.aux.system.id;
- let nodes = self
- .aux
- .replication
- .write_nodes(&partition.begin, &self.aux.system)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
-
- debug!(
- "({}) Preparing to sync {:?} with {:?}...",
- self.data.name, partition, nodes
- );
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
-
- let mut sync_futures = nodes
- .iter()
- .map(|node| {
- self.clone().do_sync_with(
- partition.clone(),
- root_cks.clone(),
- *node,
- must_exit.clone(),
- )
- })
- .collect::<FuturesUnordered<_>>();
-
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", self.data.name, e);
- }
- }
- if n_errors > self.aux.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
- }
- } else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
- }
-
- Ok(())
- }
-
- // Offload partition: this partition is not something we are storing,
- // so send it out to all other nodes that store it and delete items locally.
- // We don't bother checking if the remote nodes already have the items,
- // we just batch-send everything. Offloading isn't supposed to happen very often.
- // If any of the nodes that are supposed to store the items is unable to
- // save them, we interrupt the process.
- async fn offload_partition(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut counter: usize = 0;
-
- while !*must_exit.borrow() {
- let mut items = Vec::new();
-
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
- let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
-
- if items.len() >= 1024 {
- break;
- }
- }
-
- if items.len() > 0 {
- let nodes = self
- .aux
- .replication
- .write_nodes(&begin, &self.aux.system)
- .into_iter()
- .collect::<Vec<_>>();
- if nodes.contains(&self.aux.system.id) {
- warn!("Interrupting offload as partitions seem to have changed");
- break;
- }
-
- counter += 1;
- debug!(
- "Offloading {} items from {:?}..{:?} ({})",
- items.len(),
- begin,
- end,
- counter
- );
- self.offload_items(&items, &nodes[..]).await?;
- } else {
- break;
- }
- }
-
- Ok(())
- }
-
- async fn offload_items(
- self: &Arc<Self>,
- items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
- nodes: &[UUID],
- ) -> Result<(), Error> {
- let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(TableRPC::<F>::Update(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.aux
- .rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
-
- // All remote nodes have written those items, now we can delete them locally
- let mut not_removed = 0;
- for (k, v) in items.iter() {
- if !self.data.delete_if_equal(&k[..], &v[..])? {
- not_removed += 1;
- }
- }
-
- if not_removed > 0 {
- debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
- }
-
- Ok(())
- }
-
- fn root_checksum(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(
- &SyncRange {
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- },
- must_exit,
- )?;
- if rc.found_limit.is_none() {
- return Ok(rc);
- }
- }
- Err(Error::Message(format!(
- "Unable to compute root checksum (this should never happen)"
- )))
- }
-
- fn range_checksum(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- assert!(range.level != 0);
- trace!("Call range_checksum {:?}", range);
-
- if range.level == 1 {
- let mut children = vec![];
- for item in self
- .data
- .store
- .range(range.begin.clone()..range.end.clone())
- {
- let (key, value) = item?;
- let key_hash = blake2sum(&key[..]);
- if children.len() > 0
- && key_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(key.to_vec()),
- time: Instant::now(),
- });
- }
- let item_range = SyncRange {
- begin: key.to_vec(),
- end: vec![],
- level: 0,
- };
- children.push((item_range, blake2sum(&value[..])));
- }
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time: Instant::now(),
- })
- } else {
- let mut children = vec![];
- let mut sub_range = SyncRange {
- begin: range.begin.clone(),
- end: range.end.clone(),
- level: range.level - 1,
- };
- let mut time = Instant::now();
- while !*must_exit.borrow() {
- let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
-
- if let Some(hash) = sub_ck.hash {
- children.push((sub_range.clone(), hash));
- if sub_ck.time < time {
- time = sub_ck.time;
- }
- }
-
- if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time,
- });
- }
- let found_limit = sub_ck.found_limit.unwrap();
-
- let actual_limit_hash = blake2sum(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(found_limit.clone()),
- time,
- });
- }
-
- sub_range.begin = found_limit;
- }
- trace!("range_checksum {:?} exiting due to must_exit", range);
- Err(Error::Message(format!("Exiting.")))
- }
- }
-
- fn range_checksum_cached_hash(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksumCache, Error> {
- {
- let mut cache = self.cache[range.level].lock().unwrap();
- if let Some(v) = cache.get(&range) {
- if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
- return Ok(v.clone());
- }
- }
- cache.remove(&range);
- }
-
- let v = self.range_checksum(&range, must_exit)?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.data.name,
- hex::encode(&range.begin)
- .chars()
- .take(16)
- .collect::<String>(),
- hex::encode(&range.end).chars().take(16).collect::<String>(),
- range.level,
- v.children.len()
- );
-
- let hash = if v.children.len() > 0 {
- Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
- } else {
- None
- };
- let cache_entry = RangeChecksumCache {
- hash,
- found_limit: v.found_limit,
- time: v.time,
- };
-
- let mut cache = self.cache[range.level].lock().unwrap();
- cache.insert(range.clone(), cache_entry.clone());
- Ok(cache_entry)
- }
-
- async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
- root_ck: RangeChecksum,
- who: UUID,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut todo = VecDeque::new();
-
- // If their root checksum has level > than us, use that as a reference
- let root_cks_resp = self
- .aux
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
- partition.begin.clone(),
- partition.end.clone(),
- )),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
- if range.level > root_ck.bounds.level {
- let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
- todo.push_back(their_root_range_ck);
- } else {
- todo.push_back(root_ck);
- }
- } else {
- return Err(Error::Message(format!(
- "Invalid respone to GetRootChecksumRange RPC: {}",
- debug_serialize(root_cks_resp)
- )));
- }
-
- while !todo.is_empty() && !*must_exit.borrow() {
- let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- trace!(
- "({}) Sync with {:?}: {} ({}) remaining",
- self.data.name,
- who,
- todo.len(),
- total_children
- );
-
- let step_size = std::cmp::min(16, todo.len());
- let step = todo.drain(..step_size).collect::<Vec<_>>();
-
- let rpc_resp = self
- .aux
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
- rpc_resp
- {
- if diff_ranges.len() > 0 || diff_items.len() > 0 {
- info!(
- "({}) Sync with {:?}: difference {} ranges, {} items",
- self.data.name,
- who,
- diff_ranges.len(),
- diff_items.len()
- );
- }
- let mut items_to_send = vec![];
- for differing in diff_ranges.drain(..) {
- if differing.level == 0 {
- items_to_send.push(differing.begin);
- } else {
- let checksum = self.range_checksum(&differing, &mut must_exit)?;
- todo.push_back(checksum);
- }
- }
- if diff_items.len() > 0 {
- self.data.update_many(&diff_items[..])?;
- }
- if items_to_send.len() > 0 {
- self.send_items(who, items_to_send).await?;
- }
- } else {
- return Err(Error::Message(format!(
- "Unexpected response to sync RPC checksums: {}",
- debug_serialize(&rpc_resp)
- )));
- }
- }
- Ok(())
- }
-
- async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- info!(
- "({}) Sending {} items to {:?}",
- self.data.name,
- item_list.len(),
- who
- );
-
- let mut values = vec![];
- for item in item_list.iter() {
- if let Some(v) = self.data.store.get(&item[..])? {
- values.push(Arc::new(ByteBuf::from(v.as_ref())));
- }
- }
- let rpc_resp = self
- .aux
- .rpc_client
- .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
- .await?;
- if let TableRPC::<F>::Ok = rpc_resp {
- Ok(())
- } else {
- Err(Error::Message(format!(
- "Unexpected response to RPC Update: {}",
- debug_serialize(&rpc_resp)
- )))
- }
- }
-
- pub(crate) async fn handle_rpc(
- self: &Arc<Self>,
- message: &SyncRPC,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- match message {
- SyncRPC::GetRootChecksumRange(begin, end) => {
- let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
- Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
- }
- SyncRPC::Checksums(checksums) => {
- self.handle_checksums_rpc(&checksums[..], &mut must_exit)
- .await
- }
- _ => Err(Error::Message(format!("Unexpected sync RPC"))),
- }
- }
-
- async fn handle_checksums_rpc(
- self: &Arc<Self>,
- checksums: &[RangeChecksum],
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
-
- for their_ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
- for (their_range, their_hash) in their_ckr.children.iter() {
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
- {
- Err(_) => {
- if their_range.level >= 1 {
- let cached_hash =
- self.range_checksum_cached_hash(&their_range, must_exit)?;
- cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
- } else {
- true
- }
- }
- Ok(i) => our_ckr.children[i].1 != *their_hash,
- };
- if differs {
- ret_ranges.push(their_range.clone());
- if their_range.level == 0 {
- if let Some(item_bytes) =
- self.data.store.get(their_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- for (our_range, _hash) in our_ckr.children.iter() {
- if let Some(their_found_limit) = &their_ckr.found_limit {
- if our_range.begin.as_slice() > their_found_limit.as_slice() {
- break;
- }
- }
-
- let not_present = our_ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
- .is_err();
- if not_present {
- if our_range.level > 0 {
- ret_ranges.push(our_range.clone());
- }
- if our_range.level == 0 {
- if let Some(item_bytes) =
- self.data.store.get(our_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- }
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- if ret_ranges.len() > 0 || ret_items.len() > 0 {
- trace!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.data.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- }
- Ok(SyncRPC::Difference(ret_ranges, ret_items))
- }
-
- pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
- for i in 1..MAX_DEPTH {
- let needle = SyncRange {
- begin: item_key.to_vec(),
- end: vec![],
- level: i,
- };
- let mut cache = self.cache[i].lock().unwrap();
- if let Some(cache_entry) = cache.range(..=needle).rev().next() {
- if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
- let index = cache_entry.0.clone();
- drop(cache_entry);
- cache.remove(&index);
- }
- }
- }
- }
-}
-
-impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
- let my_id = aux.system.id;
-
- self.todo.clear();
-
- let ring = aux.system.ring.borrow().clone();
- let split_points = aux.replication.split_points(&ring);
-
- for i in 0..split_points.len() - 1 {
- let begin = split_points[i];
- let end = split_points[i + 1];
- if begin == end {
- continue;
- }
-
- let nodes = aux.replication.replication_nodes(&begin, &ring);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
- }
- }
-
- self.todo.push(TodoPartition { begin, end, retain });
- }
- }
-
- fn add_ring_difference<F: TableSchema, R: TableReplication>(
- &mut self,
- old_ring: &Ring,
- new_ring: &Ring,
- data: &TableData<F>, aux: &TableAux<F, R>,
- ) {
- let my_id = aux.system.id;
-
- // If it is us who are entering or leaving the system,
- // initiate a full sync instead of incremental sync
- if old_ring.config.members.contains_key(&my_id)
- != new_ring.config.members.contains_key(&my_id)
- {
- self.add_full_scan(data, aux);
- return;
- }
-
- let mut all_points = None
- .into_iter()
- .chain(aux.replication.split_points(old_ring).drain(..))
- .chain(aux.replication.split_points(new_ring).drain(..))
- .chain(self.todo.iter().map(|x| x.begin))
- .chain(self.todo.iter().map(|x| x.end))
- .collect::<Vec<_>>();
- all_points.sort();
- all_points.dedup();
-
- let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
- old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
- let mut new_todo = vec![];
-
- for i in 0..all_points.len() - 1 {
- let begin = all_points[i];
- let end = all_points[i + 1];
- let was_ours = aux
- .replication
- .replication_nodes(&begin, &old_ring)
- .contains(&my_id);
- let is_ours = aux
- .replication
- .replication_nodes(&begin, &new_ring)
- .contains(&my_id);
-
- let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
- Ok(_) => true,
- Err(j) => {
- (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
- || (j < old_todo.len()
- && old_todo[j].begin < end && begin < old_todo[j].end)
- }
- };
- if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
- new_todo.push(TodoPartition {
- begin,
- end,
- retain: is_ours,
- });
- }
- }
-
- self.todo = new_todo;
- }
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
-}