aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/membership.rs25
-rw-r--r--src/rpc/ring.rs10
-rw-r--r--src/rpc/rpc_client.rs2
-rw-r--r--src/rpc/rpc_server.rs8
4 files changed, 22 insertions, 23 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6cc3ed2e..4e9822fa 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,9 +11,9 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
+use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use tokio::io::AsyncWriteExt;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -316,17 +316,16 @@ impl System {
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
- self.clone()
- .background
- .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal));
+ self.background
+ .spawn_worker(format!("ping loop"), |stop_signal| {
+ self2.ping_loop(stop_signal)
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
- self2
- .consul_loop(stop_signal, consul_host, consul_service_name)
+ self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
}
@@ -531,7 +530,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok),
);
- self.background.spawn(self.clone().save_network_config()).await;
+ self.background.spawn(self.clone().save_network_config());
}
Ok(Message::Ok)
@@ -568,7 +567,7 @@ impl System {
consul_host: String,
consul_service_name: String,
) {
- loop {
+ while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
@@ -583,11 +582,7 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- _ = stop_signal.changed().fuse() => {
- if *stop_signal.borrow() {
- return;
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 215ab031..a89b730c 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -161,11 +161,11 @@ impl Ring {
})
.collect::<Vec<_>>();
- eprintln!("RING: --");
- for e in ring.iter() {
- eprintln!("{:?}", e);
- }
- eprintln!("END --");
+ // eprintln!("RING: --");
+ // for e in ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
Self { config, ring }
}
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 60286256..cffcf106 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
});
- self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await;
+ self.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
Ok(results)
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 3c5014c4..0c5bf6f9 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -13,9 +13,9 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
-use tokio_stream::wrappers::TcpListenerStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use tokio_stream::wrappers::TcpListenerStream;
use garage_util::config::TlsConfig;
use garage_util::data::*;
@@ -52,7 +52,11 @@ where
trace!(
"Request message: {}",
- serde_json::to_string(&msg).unwrap_or("<json error>".into()).chars().take(100).collect::<String>()
+ serde_json::to_string(&msg)
+ .unwrap_or("<json error>".into())
+ .chars()
+ .take(100)
+ .collect::<String>()
);
match handler(msg, sockaddr).await {