diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 178 |
1 files changed, 72 insertions, 106 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 4355bd9e..43636faa 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; -use rand::Rng; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; @@ -52,16 +52,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -#[derive(Debug, Clone)] -struct TodoPartition { - partition: Partition, - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, @@ -92,9 +82,9 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_notify: self.system.layout_notify(), - layout_version: self.system.cluster_layout().current().version, + layout_versions: self.system.cluster_layout().sync_versions(), add_full_sync_rx, - todo: vec![], + todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), }); } @@ -112,31 +102,26 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { async fn sync_partition( self: &Arc<Self>, - partition: &TodoPartition, + partition: &SyncPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { - if partition.retain { - let my_id = self.system.id; - - let nodes = self - .data - .replication - .write_nodes(&partition.begin) - .into_iter() - .filter(|node| *node != my_id) - .collect::<Vec<_>>(); + let my_id = self.system.id; + let retain = partition.storage_nodes.contains(&my_id); + if retain { debug!( "({}) Syncing {:?} with {:?}...", F::TABLE_NAME, partition, - nodes + partition.storage_nodes ); - let mut sync_futures = nodes + let mut sync_futures = partition + .storage_nodes .iter() + .filter(|node| **node != my_id) .map(|node| { self.clone() - .do_sync_with(partition.clone(), *node, must_exit.clone()) + .do_sync_with(&partition, *node, must_exit.clone()) }) .collect::<FuturesUnordered<_>>(); @@ -147,14 +132,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { warn!("({}) Sync error: {}", F::TABLE_NAME, e); } } - if n_errors > self.data.replication.max_write_errors() { + if n_errors > 0 { return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes + "Sync failed with {} nodes.", + n_errors ))); } } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) + self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit) .await?; } @@ -285,7 +270,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { async fn do_sync_with( self: Arc<Self>, - partition: TodoPartition, + partition: &SyncPartition, who: Uuid, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { @@ -492,76 +477,23 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, + layout_notify: Arc<Notify>, - layout_version: u64, + layout_versions: (u64, u64, u64), + add_full_sync_rx: mpsc::UnboundedReceiver<()>, - todo: Vec<TodoPartition>, next_full_sync: Instant, + + todo: Option<SyncPartitions>, } impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { - let system = &self.syncer.system; - let data = &self.syncer.data; - - let my_id = system.id; - - self.todo.clear(); - - let partitions = data.replication.partitions(); - - for i in 0..partitions.len() { - let begin = partitions[i].1; - - let end = if i + 1 < partitions.len() { - partitions[i + 1].1 - } else { - [0xFFu8; 32].into() - }; - - let nodes = data.replication.write_nodes(&begin); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - match data.store.range(begin..end) { - Ok(mut iter) => { - if iter.next().is_none() { - continue; - } - } - Err(e) => { - warn!("DB error in add_full_sync: {}", e); - continue; - } - } - } - - self.todo.push(TodoPartition { - partition: partitions[i].0, - begin, - end, - retain, - }); - } - + let mut partitions = self.syncer.data.replication.sync_partitions(); + partitions.partitions.shuffle(&mut thread_rng()); + self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } - - fn pop_task(&mut self) -> Option<TodoPartition> { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range(0..self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } } #[async_trait] @@ -572,18 +504,46 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.todo.len() as u64), + queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - if let Some(partition) = self.pop_task() { - self.syncer.sync_partition(&partition, must_exit).await?; - Ok(WorkerState::Busy) - } else { - Ok(WorkerState::Idle) + if let Some(todo) = &mut self.todo { + let partition = todo.partitions.pop().unwrap(); + + // process partition + if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await { + error!( + "{}: Failed to sync partition {:?}: {}", + F::TABLE_NAME, + partition, + e + ); + // if error, put partition back at the other side of the queue, + // so that other partitions will be tried in the meantime + todo.partitions.insert(0, partition); + // TODO: returning an error here will cause the background job worker + // to delay this task for some time, but maybe we don't want to + // delay it if there are lots of failures from nodes that are gone + // (we also don't want zero delays as that will cause lots of useless retries) + return Err(e); + } + + // done + if !todo.partitions.is_empty() { + return Ok(WorkerState::Busy); + } + + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); } + + self.todo = None; + Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -594,10 +554,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { } }, _ = self.layout_notify.notified() => { - let new_version = self.syncer.system.cluster_layout().current().version; - if new_version > self.layout_version { - self.layout_version = new_version; - debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + debug!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); self.add_full_sync(); } }, @@ -605,9 +571,9 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { self.add_full_sync(); } } - match self.todo.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Idle, + match self.todo.is_some() { + true => WorkerState::Busy, + false => WorkerState::Idle, } } } |