diff options
author | Alex <alex@adnab.me> | 2023-01-03 15:28:24 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-01-03 15:28:24 +0000 |
commit | 73ed9c74039448c69ebe382e361acf3ecbfef70b (patch) | |
tree | 7fb21a559e53557d5dea5efd2b7dafe9f9751367 /src/table/sync.rs | |
parent | 582b0761790b7958a3ba10c4b549b466997d2dcd (diff) | |
parent | 1d5bdc17a46648eb3494ff629d0d360d0217c1e2 (diff) | |
download | garage-73ed9c74039448c69ebe382e361acf3ecbfef70b.tar.gz garage-73ed9c74039448c69ebe382e361acf3ecbfef70b.zip |
Merge pull request 'Refactor how things are migrated' (#461) from format-migration into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/461
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -28,7 +29,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct TableSyncer<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -61,11 +62,7 @@ struct TodoPartition { retain: bool, } -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, @@ -302,7 +299,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -459,16 +456,12 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::<MerkleNode>(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -497,7 +490,7 @@ where // -------- Sync Worker --------- -struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { +struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, ring_recv: watch::Receiver<Arc<Ring>>, ring: Arc<Ring>, @@ -506,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { next_full_sync: Instant, } -impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { } #[async_trait] -impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } @@ -622,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor // ---- UTIL ---- -fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> { + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( |