aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/rpc_helper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r--src/rpc/rpc_helper.rs11
1 files changed, 5 insertions, 6 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 3fdb4acd..ce291068 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -1,12 +1,11 @@
//! Contain structs related to making RPCs
-use std::sync::Arc;
+use std::sync::{Arc, RwLock};
use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use tokio::select;
-use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -91,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHistory>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
@@ -100,7 +99,7 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ layout: Arc<RwLock<LayoutHistory>>,
rpc_timeout: Option<Duration>,
) -> Self {
let metrics = RpcMetrics::new();
@@ -108,7 +107,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- layout_watch,
+ layout,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
@@ -392,7 +391,7 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
- let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone();
+ let layout = self.0.layout.read().unwrap();
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",