aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs29
1 files changed, 11 insertions, 18 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index d6d272ab..92a353c6 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{debug_serialize, nonversioned_encode};
use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
@@ -28,7 +29,7 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -61,11 +62,7 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -302,7 +299,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -459,16 +456,12 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -497,7 +490,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -506,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -572,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
@@ -622,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
- Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
+ Ok(blake2sum(&nonversioned_encode(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(