diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 33b01455..a3afbbba 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -34,11 +34,11 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> { merkle: Arc<MerkleUpdater<F, R>>, todo: Mutex<SyncTodo>, - rpc_client: Arc<RpcClient<SyncRPC>>, + rpc_client: Arc<RpcClient<SyncRpc>>, } #[derive(Serialize, Deserialize)] -pub(crate) enum SyncRPC { +pub(crate) enum SyncRpc { RootCkHash(Partition, Hash), RootCkDifferent(bool), GetNode(MerkleNodeKey), @@ -47,7 +47,7 @@ pub(crate) enum SyncRPC { Ok, } -impl RpcMessage for SyncRPC {} +impl RpcMessage for SyncRpc {} struct SyncTodo { todo: Vec<TodoPartition>, @@ -75,7 +75,7 @@ where rpc_server: &mut RpcServer, ) -> Arc<Self> { let rpc_path = format!("table_{}/sync", data.name); - let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path); + let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path); let todo = SyncTodo { todo: vec![] }; @@ -114,7 +114,7 @@ where fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) { let self2 = self.clone(); - rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| { + rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| { let self2 = self2.clone(); async move { self2.handle_rpc(&msg).await } }); @@ -317,14 +317,14 @@ where async fn offload_items( self: &Arc<Self>, items: &[(Vec<u8>, Arc<ByteBuf>)], - nodes: &[UUID], + nodes: &[Uuid], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); self.rpc_client .try_call_many( nodes, - SyncRPC::Items(values), + SyncRpc::Items(values), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) .await?; @@ -362,7 +362,7 @@ where async fn do_sync_with( self: Arc<Self>, partition: TodoPartition, - who: UUID, + who: Uuid, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -381,20 +381,20 @@ where .rpc_client .call( who, - SyncRPC::RootCkHash(partition.partition, root_ck_hash), + SyncRpc::RootCkHash(partition.partition, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - SyncRPC::RootCkDifferent(false) => { + SyncRpc::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), + SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]), x => { return Err(Error::Message(format!( "Invalid respone to RootCkHash RPC: {}", @@ -431,10 +431,10 @@ where // and compare it with local node let remote_node = match self .rpc_client - .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) + .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT) .await? { - SyncRPC::Node(_, node) => node, + SyncRpc::Node(_, node) => node, x => { return Err(Error::Message(format!( "Invalid respone to GetNode RPC: {}", @@ -478,7 +478,7 @@ where Ok(()) } - async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -493,9 +493,9 @@ where let rpc_resp = self .rpc_client - .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT) + .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT) .await?; - if let SyncRPC::Ok = rpc_resp { + if let SyncRpc::Ok = rpc_resp { Ok(()) } else { Err(Error::Message(format!( @@ -507,20 +507,20 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { + async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> { match message { - SyncRPC::RootCkHash(range, h) => { + SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; let hash = hash_of::<MerkleNode>(&root_ck)?; - Ok(SyncRPC::RootCkDifferent(hash != *h)) + Ok(SyncRpc::RootCkDifferent(hash != *h)) } - SyncRPC::GetNode(k) => { + SyncRpc::GetNode(k) => { let node = self.merkle.read_node(&k)?; - Ok(SyncRPC::Node(k.clone(), node)) + Ok(SyncRpc::Node(k.clone(), node)) } - SyncRPC::Items(items) => { + SyncRpc::Items(items) => { self.data.update_many(items)?; - Ok(SyncRPC::Ok) + Ok(SyncRpc::Ok) } _ => Err(Error::Message("Unexpected sync RPC".to_string())), } |