diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/sync.rs | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 2da1bfe7..4355bd9e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -10,7 +10,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use garage_util::background::*; use garage_util::data::*; @@ -91,8 +91,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch(), - layout: self.system.cluster_layout().clone(), + layout_notify: self.system.layout_notify(), + layout_version: self.system.cluster_layout().current().version, add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, - layout_watch: watch::Receiver<Arc<LayoutHistory>>, - layout: Arc<LayoutHistory>, + layout_notify: Arc<Notify>, + layout_version: u64, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec<TodoPartition>, next_full_sync: Instant, @@ -593,12 +593,11 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { self.add_full_sync(); } }, - _ = self.layout_watch.changed() => { - let new_layout = self.layout_watch.borrow(); - if !Arc::ptr_eq(&new_layout, &self.layout) { - self.layout = new_layout.clone(); - drop(new_layout); - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + _ = self.layout_notify.notified() => { + let new_version = self.syncer.system.cluster_layout().current().version; + if new_version > self.layout_version { + self.layout_version = new_version; + debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } }, |