diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -28,7 +29,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct TableSyncer<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -61,11 +62,7 @@ struct TodoPartition { retain: bool, } -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, @@ -302,7 +299,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -459,16 +456,12 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::<MerkleNode>(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -497,7 +490,7 @@ where // -------- Sync Worker --------- -struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { +struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, ring_recv: watch::Receiver<Arc<Ring>>, ring: Arc<Ring>, @@ -506,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { next_full_sync: Instant, } -impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { } #[async_trait] -impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } @@ -622,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor // ---- UTIL ---- -fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> { + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( |