aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs87
1 files changed, 41 insertions, 46 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 9d79d856..92a353c6 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@@ -13,7 +14,8 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::encode::{debug_serialize, nonversioned_encode};
+use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@@ -27,12 +29,12 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- add_full_sync_tx: mpsc::UnboundedSender<()>,
+ add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -60,12 +62,8 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
+ pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -74,34 +72,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
-
let syncer = Arc::new(Self {
- system: system.clone(),
+ system,
data,
merkle,
- add_full_sync_tx,
+ add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
-
syncer.endpoint.set_handler(syncer.clone());
- system.background.spawn_worker(SyncWorker {
- syncer: syncer.clone(),
- ring_recv: system.ring.clone(),
- ring: system.ring.borrow().clone(),
+ syncer
+ }
+
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
+ self.add_full_sync_tx
+ .store(Some(Arc::new(add_full_sync_tx)));
+
+ bg.spawn_worker(SyncWorker {
+ syncer: self.clone(),
+ ring_recv: self.system.ring.clone(),
+ ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
-
- syncer
}
- pub fn add_full_sync(&self) {
- if self.add_full_sync_tx.send(()).is_err() {
- error!("({}) Could not add full sync", F::TABLE_NAME);
- }
+ pub fn add_full_sync(&self) -> Result<(), Error> {
+ let tx = self.add_full_sync_tx.load();
+ let tx = tx
+ .as_ref()
+ .ok_or_message("table sync worker is not running")?;
+ tx.send(()).ok_or_message("send error")?;
+ Ok(())
}
// ----
@@ -295,7 +299,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -452,16 +456,12 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -490,7 +490,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -499,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -565,17 +565,15 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.todo.len();
- if l > 0 {
- Some(format!("{} partitions remaining", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.todo.len() as u64),
+ ..Default::default()
}
}
@@ -588,10 +586,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
- }
+ async fn wait_for_work(&mut self) -> WorkerState {
select! {
s = self.add_full_sync_rx.recv() => {
if let Some(()) = s {
@@ -620,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
- Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
+ Ok(blake2sum(&nonversioned_encode(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(