From bfb1845fdc981a370539d641a5d80f438f184f07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:12:05 +0100 Subject: layout: refactor to use a RwLock on LayoutHistory --- src/rpc/rpc_helper.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) (limited to 'src/rpc/rpc_helper.rs') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 3fdb4acd..ce291068 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,12 +1,11 @@ //! Contain structs related to making RPCs -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use tokio::select; -use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -91,7 +90,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - layout_watch: watch::Receiver>, + layout: Arc>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +99,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - layout_watch: watch::Receiver>, + layout: Arc>, rpc_timeout: Option, ) -> Self { let metrics = RpcMetrics::new(); @@ -108,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - layout_watch, + layout, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) @@ -392,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let layout: Arc = self.0.layout_watch.borrow().clone(); + let layout = self.0.layout.read().unwrap(); let our_zone = match layout.current().node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", -- cgit v1.2.3