diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 11 |
1 files changed, 5 insertions, 6 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 3fdb4acd..ce291068 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,12 +1,11 @@ //! Contain structs related to making RPCs -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use tokio::select; -use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -91,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - layout_watch: watch::Receiver<Arc<LayoutHistory>>, + layout: Arc<RwLock<LayoutHistory>>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +99,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc<FullMeshPeeringStrategy>, - layout_watch: watch::Receiver<Arc<LayoutHistory>>, + layout: Arc<RwLock<LayoutHistory>>, rpc_timeout: Option<Duration>, ) -> Self { let metrics = RpcMetrics::new(); @@ -108,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - layout_watch, + layout, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) @@ -392,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone(); + let layout = self.0.layout.read().unwrap(); let our_zone = match layout.current().node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", |