diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-15 11:05:09 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-22 16:55:24 +0200 |
commit | 1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch) | |
tree | d6437f105a630fa197b67446b5c3b2902335c34a /src/table/sync.rs | |
parent | 4067797d0142ee7860aff8da95d65820d6cc0889 (diff) | |
download | garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip |
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 36 |
1 files changed, 15 insertions, 21 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index c5db0987..4fcdc528 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -45,11 +45,10 @@ pub(crate) enum SyncRpc { Node(MerkleNodeKey, MerkleNode), Items(Vec<Arc<ByteBuf>>), Ok, - Error(String), } -impl Message for SyncRpc { - type Response = SyncRpc; +impl Rpc for SyncRpc { + type Response = Result<SyncRpc, Error>; } struct SyncTodo { @@ -305,7 +304,7 @@ where async fn offload_items( self: &Arc<Self>, items: &[(Vec<u8>, Arc<ByteBuf>)], - nodes: &[NodeID], + nodes: &[Uuid], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); @@ -354,7 +353,7 @@ where async fn do_sync_with( self: Arc<Self>, partition: TodoPartition, - who: NodeID, + who: Uuid, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -480,7 +479,7 @@ where Ok(()) } - async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -513,9 +512,17 @@ where ))) } } +} + +// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> { +#[async_trait] +impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -535,19 +542,6 @@ where } } -#[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e))) - } -} - impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, |