From de9d6cddf709e686ada3d1e71de7b31d7704b8b5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Dec 2022 17:16:49 +0100 Subject: Prettier worker list table; remove useless CLI log messages --- src/table/sync.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 9d79d856..af7aa640 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -570,12 +570,10 @@ impl Worker for SyncWor format!("{} sync", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.todo.len(); - if l > 0 { - Some(format!("{} partitions remaining", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.todo.len() as u64), + ..Default::default() } } -- cgit v1.2.3 From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/table/sync.rs | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..7008a383 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer data: Arc>, merkle: Arc>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption>, endpoint: Arc>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc, data: Arc>, merkle: Arc>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + self.system.background.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/table/sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc) { + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/table/sync.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 1e7618ca..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -593,10 +593,7 @@ impl Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { -- cgit v1.2.3 From cdb2a591e9d393d24ab5c49bb905b0589b193299 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 14:44:47 +0100 Subject: Refactor how things are migrated --- src/table/sync.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..abc034f8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -302,7 +302,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -468,7 +468,7 @@ where match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -622,7 +622,7 @@ impl Worker for SyncWor // ---- UTIL ---- -fn hash_of(x: &T) -> Result { +fn hash_of_merkle_node(x: &MerkleNode) -> Result { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } -- cgit v1.2.3 From 426d8784dac0e39879af52d980887d3692fc907c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:08:37 +0100 Subject: cleanup --- src/table/sync.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer { +pub struct TableSyncer { system: Arc, data: Arc>, merkle: Arc>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableSyncer { pub(crate) fn new( system: Arc, data: Arc>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl EndpointHandler for TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker { +struct SyncWorker { syncer: Arc>, ring_recv: watch::Receiver>, ring: Arc, @@ -506,7 +498,7 @@ struct SyncWorker { next_full_sync: Instant, } -impl SyncWorker { +impl SyncWorker { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl SyncWorker { } #[async_trait] -impl Worker for SyncWorker { +impl Worker for SyncWorker { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } -- cgit v1.2.3 From 8d5505514f950dc1ca1249a3385c9913b5b5e8e0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:27:36 +0100 Subject: Make it explicit when using nonversioned encoding --- src/table/sync.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 29e7aa89..c66c863f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -615,7 +616,7 @@ impl Worker for SyncWorker { // ---- UTIL ---- fn hash_of_merkle_node(x: &MerkleNode) -> Result { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( -- cgit v1.2.3 From a54b67740d08e3fabeb1652a1bed14d78fea4b74 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:29:29 +0100 Subject: move debug_serialize to garage_util::encode --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index c66c863f..1f23d3a1 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::nonversioned_encode; +use garage_util::encode::{nonversioned_encode, debug_serialize}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; -- cgit v1.2.3 From d6d571d51216d2077a41216e067b32736fbd745a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:30:21 +0100 Subject: cargo fmt --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table/sync.rs') diff --git a/src/table/sync.rs b/src/table/sync.rs index 1f23d3a1..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::{nonversioned_encode, debug_serialize}; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; -- cgit v1.2.3