From 6a8439fd1345ecae7414386f76dda7a03eb14df2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 23:14:12 +0100 Subject: Some improvements in background worker but we terminate late --- src/rpc/membership.rs | 25 ++++++++++--------------- src/rpc/ring.rs | 10 +++++----- src/rpc/rpc_client.rs | 2 +- src/rpc/rpc_server.rs | 8 ++++++-- 4 files changed, 22 insertions(+), 23 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6cc3ed2e..4e9822fa 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,9 +11,9 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWriteExt; use tokio::sync::watch; use tokio::sync::Mutex; -use tokio::io::AsyncWriteExt; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -316,17 +316,16 @@ impl System { self.clone().ping_nodes(bootstrap_peers).await; let self2 = self.clone(); - self.clone() - .background - .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal)); + self.background + .spawn_worker(format!("ping loop"), |stop_signal| { + self2.ping_loop(stop_signal) + }); if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { let self2 = self.clone(); - self.clone() - .background + self.background .spawn_worker(format!("Consul loop"), |stop_signal| { - self2 - .consul_loop(stop_signal, consul_host, consul_service_name) + self2.consul_loop(stop_signal, consul_host, consul_service_name) }); } } @@ -531,7 +530,7 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); - self.background.spawn(self.clone().save_network_config()).await; + self.background.spawn(self.clone().save_network_config()); } Ok(Message::Ok) @@ -568,7 +567,7 @@ impl System { consul_host: String, consul_service_name: String, ) { - loop { + while !*stop_signal.borrow() { let restart_at = tokio::time::sleep(CONSUL_INTERVAL); match get_consul_nodes(&consul_host, &consul_service_name).await { @@ -583,11 +582,7 @@ impl System { select! { _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => { - if *stop_signal.borrow() { - return; - } - } + _ = stop_signal.changed().fuse() => (), } } } diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 215ab031..a89b730c 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -161,11 +161,11 @@ impl Ring { }) .collect::>(); - eprintln!("RING: --"); - for e in ring.iter() { - eprintln!("{:?}", e); - } - eprintln!("END --"); + // eprintln!("RING: --"); + // for e in ring.iter() { + // eprintln!("{:?}", e); + // } + // eprintln!("END --"); Self { config, ring } } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 60286256..cffcf106 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -198,7 +198,7 @@ impl RpcClient { let wait_finished_fut = tokio::spawn(async move { resp_stream.collect::>().await; }); - self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await; + self.background.spawn(wait_finished_fut.map(|_| Ok(()))); } Ok(results) diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 3c5014c4..0c5bf6f9 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -13,9 +13,9 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; -use tokio_stream::wrappers::TcpListenerStream; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; +use tokio_stream::wrappers::TcpListenerStream; use garage_util::config::TlsConfig; use garage_util::data::*; @@ -52,7 +52,11 @@ where trace!( "Request message: {}", - serde_json::to_string(&msg).unwrap_or("".into()).chars().take(100).collect::() + serde_json::to_string(&msg) + .unwrap_or("".into()) + .chars() + .take(100) + .collect::() ); match handler(msg, sockaddr).await { -- cgit v1.2.3