aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
commit0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch)
treea3f57c18da5377a618c38f3e4bba002c9eed1358 /src/rpc
parent4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff)
downloadgarage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz
garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip
WIP migrate to tokio 1
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml11
-rw-r--r--src/rpc/membership.rs30
-rw-r--r--src/rpc/rpc_client.rs2
-rw-r--r--src/rpc/rpc_server.rs5
4 files changed, 24 insertions, 24 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 48f05755..fc066bef 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -29,13 +29,14 @@ serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = {version = "0.1", features = ["net"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
-tokio-rustls = "0.13"
-hyper-rustls = { version = "0.20", default-features = false }
+hyper = { version = "0.14", features = ["full"] }
+rustls = "0.19"
+tokio-rustls = "0.22"
+hyper-rustls = { version = "0.22", default-features = false }
webpki = "0.21"
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6749478a..6cc3ed2e 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,9 +11,9 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
+use tokio::io::AsyncWriteExt;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -395,7 +395,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -421,7 +421,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -503,7 +503,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -523,7 +523,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -531,7 +531,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok),
);
- self.background.spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_network_config()).await;
}
Ok(Message::Ok)
@@ -539,7 +539,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -553,10 +553,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -570,7 +569,7 @@ impl System {
consul_service_name: String,
) {
loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -584,10 +583,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index cffcf106..60286256 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
});
- self.background.spawn(wait_finished_fut.map(|_| Ok(())));
+ self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await;
}
Ok(results)
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 4d14b790..3c5014c4 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -13,6 +13,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
+use tokio_stream::wrappers::TcpListenerStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
@@ -171,8 +172,8 @@ impl RpcServer {
config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
- let mut listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = listener.incoming().filter_map(|socket| async {
+ let listener = TcpListener::bind(&self.bind_addr).await?;
+ let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),