diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 60 |
1 files changed, 38 insertions, 22 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 43636faa..8c21db8b 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -488,8 +488,29 @@ struct SyncWorker<F: TableSchema, R: TableReplication> { } impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { + fn check_add_full_sync(&mut self) { + let layout_versions = self.syncer.system.cluster_layout().sync_versions(); + if layout_versions != self.layout_versions { + self.layout_versions = layout_versions; + info!( + "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + F::TABLE_NAME, + layout_versions.0, + layout_versions.1, + layout_versions.2 + ); + self.add_full_sync(); + } + } + fn add_full_sync(&mut self) { let mut partitions = self.syncer.data.replication.sync_partitions(); + info!( + "{}: Adding full sync for ack layout version {}", + F::TABLE_NAME, + partitions.layout_version + ); + partitions.partitions.shuffle(&mut thread_rng()); self.todo = Some(partitions); self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; @@ -510,6 +531,8 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { } async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + self.check_add_full_sync(); + if let Some(todo) = &mut self.todo { let partition = todo.partitions.pop().unwrap(); @@ -531,19 +554,23 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { return Err(e); } - // done - if !todo.partitions.is_empty() { - return Ok(WorkerState::Busy); + if todo.partitions.is_empty() { + info!( + "{}: Completed full sync for ack layout version {}", + F::TABLE_NAME, + todo.layout_version + ); + self.syncer + .system + .layout_manager + .sync_table_until(F::TABLE_NAME, todo.layout_version); + self.todo = None; } - self.syncer - .system - .layout_manager - .sync_table_until(F::TABLE_NAME, todo.layout_version); + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) } - - self.todo = None; - Ok(WorkerState::Idle) } async fn wait_for_work(&mut self) -> WorkerState { @@ -554,18 +581,7 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { } }, _ = self.layout_notify.notified() => { - let layout_versions = self.syncer.system.cluster_layout().sync_versions(); - if layout_versions != self.layout_versions { - self.layout_versions = layout_versions; - debug!( - "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", - F::TABLE_NAME, - layout_versions.0, - layout_versions.1, - layout_versions.2 - ); - self.add_full_sync(); - } + self.check_add_full_sync(); }, _ = tokio::time::sleep_until(self.next_full_sync.into()) => { self.add_full_sync(); |