aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
commit0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch)
treea3f57c18da5377a618c38f3e4bba002c9eed1358 /src
parent4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff)
downloadgarage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz
garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip
WIP migrate to tokio 1
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml4
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/server.rs6
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/block.rs23
-rw-r--r--src/model/object_table.rs3
-rw-r--r--src/model/version_table.rs3
-rw-r--r--src/rpc/Cargo.toml11
-rw-r--r--src/rpc/membership.rs30
-rw-r--r--src/rpc/rpc_client.rs2
-rw-r--r--src/rpc/rpc_server.rs5
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/gc.rs4
-rw-r--r--src/table/merkle.rs4
-rw-r--r--src/table/sync.rs35
-rw-r--r--src/util/Cargo.toml7
-rw-r--r--src/util/background.rs131
-rw-r--r--src/util/error.rs8
-rw-r--r--src/web/Cargo.toml2
20 files changed, 147 insertions, 144 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 45388eff..c3208b66 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -31,10 +31,10 @@ rand = "0.7"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "^0.13.6"
+hyper = "0.14"
url = "2.1"
httpdate = "0.3"
percent-encoding = "2.1.0"
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 8c28394b..36bbcd50 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -38,4 +38,4 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 29740feb..ce90ecab 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -21,13 +21,13 @@ async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error>
.await
.expect("failed to install CTRL+C signal handler");
info!("Received CTRL+C, shutting down.");
- send_cancel.broadcast(true)?;
+ send_cancel.send(true)?;
Ok(())
}
async fn wait_from(mut chan: watch::Receiver<bool>) -> () {
- while let Some(exit_now) = chan.recv().await {
- if exit_now {
+ while !*chan.borrow() {
+ if chan.changed().await.is_err() {
return;
}
}
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index caeed66c..8f36cf2e 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -33,5 +33,4 @@ serde_bytes = "0.11"
async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
-
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/model/block.rs b/src/model/block.rs
index 023ed3ab..7185372c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -5,10 +5,9 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
use futures::select;
-use futures::stream::*;
use serde::{Deserialize, Serialize};
use tokio::fs;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
@@ -134,7 +133,7 @@ impl BlockManager {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await;
+ tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
bm2.resync_loop(must_exit)
});
@@ -251,7 +250,7 @@ impl BlockManager {
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
- self.resync_notify.notify();
+ self.resync_notify.notify_waiters();
Ok(())
}
@@ -262,7 +261,7 @@ impl BlockManager {
while !*must_exit.borrow() {
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
- tokio::time::delay_for(Duration::from_secs(10)).await;
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@@ -283,17 +282,17 @@ impl BlockManager {
self.resync_queue.remove(&time_bytes)?;
res?; // propagate error to delay main loop
} else {
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
+ let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
Ok(())
@@ -467,8 +466,12 @@ impl BlockManager {
// so that we can offload them if necessary and then delete them locally.
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next().await {
- let data_dir_ent = data_dir_ent?;
+ loop {
+ let data_dir_ent = ls_data_dir.next_entry().await?;
+ let data_dir_ent = match data_dir_ent {
+ Some(x) => x,
+ None => break,
+ };
let name = data_dir_ent.file_name();
let name = match name.into_string() {
Ok(x) => x,
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 99fad3ce..d08bba70 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -195,7 +195,8 @@ impl TableSchema for ObjectTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
- self.background.spawn(async move {
+ // TODO not cancellable
+ self.background.spawn_cancellable(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
for v in old_v.versions.iter() {
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 841fbfea..19343890 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -110,7 +110,8 @@ impl TableSchema for VersionTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
- self.background.spawn(async move {
+ // TODO not cancellable
+ self.background.spawn_cancellable(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
if new_v.deleted.get() && !old_v.deleted.get() {
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 48f05755..fc066bef 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -29,13 +29,14 @@ serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = {version = "0.1", features = ["net"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
-tokio-rustls = "0.13"
-hyper-rustls = { version = "0.20", default-features = false }
+hyper = { version = "0.14", features = ["full"] }
+rustls = "0.19"
+tokio-rustls = "0.22"
+hyper-rustls = { version = "0.22", default-features = false }
webpki = "0.21"
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6749478a..6cc3ed2e 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,9 +11,9 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
+use tokio::io::AsyncWriteExt;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -395,7 +395,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -421,7 +421,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -503,7 +503,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -523,7 +523,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -531,7 +531,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok),
);
- self.background.spawn(self.clone().save_network_config());
+ self.background.spawn(self.clone().save_network_config()).await;
}
Ok(Message::Ok)
@@ -539,7 +539,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -553,10 +553,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -570,7 +569,7 @@ impl System {
consul_service_name: String,
) {
loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -584,10 +583,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index cffcf106..60286256 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
});
- self.background.spawn(wait_finished_fut.map(|_| Ok(())));
+ self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await;
}
Ok(results)
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 4d14b790..3c5014c4 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -13,6 +13,7 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
+use tokio_stream::wrappers::TcpListenerStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
@@ -171,8 +172,8 @@ impl RpcServer {
config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
- let mut listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = listener.incoming().filter_map(|socket| async {
+ let listener = TcpListener::bind(&self.bind_addr).await?;
+ let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 6b3aaceb..8f73470e 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -31,5 +31,5 @@ serde_bytes = "0.11"
async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/table/data.rs b/src/table/data.rs
index 0a7b2cec..0029b936 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -159,7 +159,7 @@ where
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
@@ -184,7 +184,7 @@ where
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
}
Ok(removed)
}
@@ -209,7 +209,7 @@ where
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
diff --git a/src/table/gc.rs b/src/table/gc.rs
index fd9a26d1..d37fdf35 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -85,8 +85,8 @@ where
}
}
select! {
- _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 5ce9cee3..86289bf1 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -121,13 +121,13 @@ impl MerkleUpdater {
"({}) Error while iterating on Merkle todo tree: {}",
self.table_name, e
);
- tokio::time::delay_for(Duration::from_secs(10)).await;
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
} else {
select! {
_ = self.todo_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index b344eb88..65231cd5 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,7 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::{pin_mut, select};
+use futures::{select};
use futures_util::future::*;
use futures_util::stream::*;
use rand::Rng;
@@ -110,7 +110,7 @@ where
let s3 = syncer.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
+ tokio::time::sleep(Duration::from_secs(20)).await;
s3.add_full_sync();
});
@@ -142,23 +142,16 @@ where
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
- self.add_full_sync();
- prev_ring = new_ring;
- }
+ _ = ring_recv.changed().fuse() => {
+ let new_ring = ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &prev_ring) {
+ debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ prev_ring = new_ring.clone();
}
}
- busy_opt = s_busy => {
+ busy_opt = busy_rx.recv().fuse() => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
@@ -169,12 +162,8 @@ where
}
}
}
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
+ _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
@@ -213,7 +202,7 @@ where
}
} else {
busy_tx.send(false).unwrap();
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 7bb7cb31..2ae4796c 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -28,14 +28,15 @@ rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
chrono = "0.4"
+arc-swap = "1.2"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
+hyper = "0.14"
+rustls = "0.19"
webpki = "0.21"
roxmltree = "0.11"
diff --git a/src/util/background.rs b/src/util/background.rs
index 3e600fdf..0ec9779a 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -2,11 +2,9 @@ use core::future::Future;
use std::pin::Pin;
use std::sync::Mutex;
-use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
+use arc_swap::ArcSwapOption;
use std::sync::Arc;
-use tokio::sync::{mpsc, watch, Notify};
+use tokio::sync::{mpsc, watch};
use crate::error::Error;
@@ -14,12 +12,9 @@ type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
- n_runners: usize,
pub stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
- job_notify: Notify,
+ queue_in: ArcSwapOption<mpsc::UnboundedSender<(Job, bool)>>,
workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
}
@@ -27,50 +22,91 @@ pub struct BackgroundRunner {
impl BackgroundRunner {
pub fn new(n_runners: usize, stop_signal: watch::Receiver<bool>) -> Arc<Self> {
let (queue_in, queue_out) = mpsc::unbounded_channel();
+
+ let mut workers = vec![];
+ let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+ let stop_signal = stop_signal.clone();
+
+ workers.push(tokio::spawn(async move {
+ while let Some((job, cancellable)) = queue_out.lock().await.recv().await {
+ if cancellable && *stop_signal.borrow() {
+ continue;
+ }
+ if let Err(e) = job.await {
+ error!("Job failed: {}", e)
+ }
+ }
+ info!("Worker {} exiting", i);
+ }));
+ }
+
Arc::new(Self {
- n_runners,
stop_signal,
- queue_in,
- queue_out: Mutex::new(queue_out),
- job_notify: Notify::new(),
- workers: Mutex::new(Vec::new()),
+ queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))),
+ workers: Mutex::new(workers),
})
}
pub async fn run(self: Arc<Self>) {
- let mut workers = self.workers.lock().unwrap();
- for i in 0..self.n_runners {
- workers.push(tokio::spawn(self.clone().runner(i)));
- }
- drop(workers);
-
let mut stop_signal = self.stop_signal.clone();
- while let Some(exit_now) = stop_signal.recv().await {
+
+ loop {
+ let exit_now = match stop_signal.changed().await {
+ Ok(()) => *stop_signal.borrow(),
+ Err(e) => {
+ error!("Watch .changed() error: {}", e);
+ true
+ }
+ };
if exit_now {
- let mut workers = self.workers.lock().unwrap();
- let workers_vec = workers.drain(..).collect::<Vec<_>>();
- join_all(workers_vec).await;
- return;
+ break;
+ }
+ }
+
+ info!("Closing background job queue_in...");
+ drop(self.queue_in.swap(None));
+
+ info!("Waiting for all workers to terminate...");
+ while let Some(task) = self.workers.lock().unwrap().pop() {
+ if let Err(e) = task.await {
+ warn!("Error awaiting task: {}", e);
}
}
}
- pub fn spawn<T>(&self, job: T)
+ // Spawn a task to be run in background
+ pub async fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- let boxed: Job = Box::pin(job);
- let _: Result<_, _> = self.queue_in.clone().send((boxed, false));
- self.job_notify.notify();
+ match self.queue_in.load().as_ref() {
+ Some(chan) => {
+ let boxed: Job = Box::pin(job);
+ chan.send((boxed, false)).map_err(|_| "send error").unwrap();
+ }
+ None => {
+ warn!("Doing background job now because we are exiting...");
+ if let Err(e) = job.await {
+ warn!("Task failed: {}", e);
+ }
+ }
+ }
}
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- let boxed: Job = Box::pin(job);
- let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
- self.job_notify.notify();
+ match self.queue_in.load().as_ref() {
+ Some(chan) => {
+ let boxed: Job = Box::pin(job);
+ chan.send((boxed, false)).map_err(|_| "send error").unwrap();
+ }
+ None => (), // drop job if we are exiting
+ }
}
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
@@ -85,37 +121,4 @@ impl BackgroundRunner {
info!("Worker exited: {}", name);
}));
}
-
- async fn runner(self: Arc<Self>, i: usize) {
- let mut stop_signal = self.stop_signal.clone();
- loop {
- let must_exit: bool = *stop_signal.borrow();
- if let Some(job) = self.dequeue_job(must_exit) {
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- } else {
- if must_exit {
- info!("Background runner {} exiting", i);
- return;
- }
- select! {
- _ = self.job_notify.notified().fuse() => (),
- _ = stop_signal.recv().fuse() => (),
- }
- }
- }
- }
-
- fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
- let mut queue = self.queue_out.lock().unwrap();
- while let Ok((job, cancellable)) = queue.try_recv() {
- if cancellable && must_exit {
- continue;
- } else {
- return Some(job);
- }
- }
- None
- }
}
diff --git a/src/util/error.rs b/src/util/error.rs
index dbf71ac1..a9bf0824 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -8,16 +8,22 @@ use crate::data::*;
pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)]
NodeDown(UUID),
+
#[error(display = "Timeout: {}", _0)]
- Timeout(#[error(source)] tokio::time::Elapsed),
+ Timeout(#[error(source)] tokio::time::error::Elapsed),
+
#[error(display = "HTTP error: {}", _0)]
HTTP(#[error(source)] http::Error),
+
#[error(display = "Hyper error: {}", _0)]
Hyper(#[error(source)] hyper::Error),
+
#[error(display = "Messagepack encode error: {}", _0)]
RMPEncode(#[error(source)] rmp_serde::encode::Error),
+
#[error(display = "Messagepack decode error: {}", _0)]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
+
#[error(display = "Too many errors: {:?}", _0)]
TooManyErrors(Vec<String>),
}
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 5cc8683c..8c340f6b 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -22,7 +22,7 @@ err-derive = "0.2.3"
log = "0.4"
futures = "0.3"
http = "0.2"
-hyper = "0.13"
+hyper = "0.14"
percent-encoding = "2.1.0"
roxmltree = "0.11"
idna = "0.2"