From bfb1845fdc981a370539d641a5d80f438f184f07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:12:05 +0100 Subject: layout: refactor to use a RwLock on LayoutHistory --- src/table/sync.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 2da1bfe7..4355bd9e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -10,7 +10,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use garage_util::background::*; use garage_util::data::*; @@ -91,8 +91,8 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch(), - layout: self.system.cluster_layout().clone(), + layout_notify: self.system.layout_notify(), + layout_version: self.system.cluster_layout().current().version, add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - layout_watch: watch::Receiver>, - layout: Arc, + layout_notify: Arc, + layout_version: u64, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, @@ -593,12 +593,11 @@ impl Worker for SyncWorker { self.add_full_sync(); } }, - _ = self.layout_watch.changed() => { - let new_layout = self.layout_watch.borrow(); - if !Arc::ptr_eq(&new_layout, &self.layout) { - self.layout = new_layout.clone(); - drop(new_layout); - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + _ = self.layout_notify.notified() => { + let new_version = self.syncer.system.cluster_layout().current().version; + if new_version > self.layout_version { + self.layout_version = new_version; + debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } }, -- cgit v1.2.3