diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-15 22:36:41 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-15 22:36:41 +0100 |
commit | 0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch) | |
tree | a3f57c18da5377a618c38f3e4bba002c9eed1358 /src/rpc | |
parent | 4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff) | |
download | garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip |
WIP migrate to tokio 1
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 11 | ||||
-rw-r--r-- | src/rpc/membership.rs | 30 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 2 | ||||
-rw-r--r-- | src/rpc/rpc_server.rs | 5 |
4 files changed, 24 insertions, 24 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 48f05755..fc066bef 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -29,13 +29,14 @@ serde_json = "1.0" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-stream = {version = "0.1", features = ["net"] } http = "0.2" -hyper = "0.13" -rustls = "0.17" -tokio-rustls = "0.13" -hyper-rustls = { version = "0.20", default-features = false } +hyper = { version = "0.14", features = ["full"] } +rustls = "0.19" +tokio-rustls = "0.22" +hyper-rustls = { version = "0.22", default-features = false } webpki = "0.21" diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6749478a..6cc3ed2e 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,9 +11,9 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use tokio::prelude::*; use tokio::sync::watch; use tokio::sync::Mutex; +use tokio::io::AsyncWriteExt; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -395,7 +395,7 @@ impl System { if has_changes { status.recalculate_hash(); } - if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { + if let Err(e) = update_locked.0.send(Arc::new(status)) { error!("In ping_nodes: could not save status update ({})", e); } drop(update_locked); @@ -421,7 +421,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - update_locked.0.broadcast(Arc::new(status))?; + update_locked.0.send(Arc::new(status))?; drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -503,7 +503,7 @@ impl System { if has_changed { status.recalculate_hash(); } - update_lock.0.broadcast(Arc::new(status))?; + update_lock.0.send(Arc::new(status))?; drop(update_lock); if to_ping.len() > 0 { @@ -523,7 +523,7 @@ impl System { if adv.version > ring.config.version { let ring = Ring::new(adv.clone()); - update_lock.1.broadcast(Arc::new(ring))?; + update_lock.1.send(Arc::new(ring))?; drop(update_lock); self.background.spawn_cancellable( @@ -531,7 +531,7 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); - self.background.spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_network_config()).await; } Ok(Message::Ok) @@ -539,7 +539,7 @@ impl System { async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { loop { - let restart_at = tokio::time::delay_for(PING_INTERVAL); + let restart_at = tokio::time::sleep(PING_INTERVAL); let status = self.status.borrow().clone(); let ping_addrs = status @@ -553,10 +553,9 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), + _ = stop_signal.changed().fuse() => { + if *stop_signal.borrow() { + return; } } } @@ -570,7 +569,7 @@ impl System { consul_service_name: String, ) { loop { - let restart_at = tokio::time::delay_for(CONSUL_INTERVAL); + let restart_at = tokio::time::sleep(CONSUL_INTERVAL); match get_consul_nodes(&consul_host, &consul_service_name).await { Ok(mut node_list) => { @@ -584,10 +583,9 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), + _ = stop_signal.changed().fuse() => { + if *stop_signal.borrow() { + return; } } } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index cffcf106..60286256 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { let wait_finished_fut = tokio::spawn(async move { resp_stream.collect::<Vec<_>>().await; }); - self.background.spawn(wait_finished_fut.map(|_| Ok(()))); + self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await; } Ok(results) diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4d14b790..3c5014c4 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -13,6 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; +use tokio_stream::wrappers::TcpListenerStream; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; @@ -171,8 +172,8 @@ impl RpcServer { config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?; let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); - let mut listener = TcpListener::bind(&self.bind_addr).await?; - let incoming = listener.incoming().filter_map(|socket| async { + let listener = TcpListener::bind(&self.bind_addr).await?; + let incoming = TcpListenerStream::new(listener).filter_map(|socket| async { match socket { Ok(stream) => match tls_acceptor.clone().accept(stream).await { Ok(x) => Some(Ok::<_, hyper::Error>(x)), |