aboutsummaryrefslogtreecommitdiff
path: root/src/util/background.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/background.rs')
-rw-r--r--src/util/background.rs16
1 files changed, 8 insertions, 8 deletions
diff --git a/src/util/background.rs b/src/util/background.rs
index 937062dd..8081f157 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,11 +1,11 @@
use core::future::Future;
use std::pin::Pin;
+use std::sync::Mutex;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use std::sync::Arc;
-use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch, Notify};
use crate::error::Error;
@@ -38,7 +38,7 @@ impl BackgroundRunner {
}
pub async fn run(self: Arc<Self>) {
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
for i in 0..self.n_runners {
workers.push(tokio::spawn(self.clone().runner(i)));
}
@@ -47,7 +47,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone();
while let Some(exit_now) = stop_signal.recv().await {
if exit_now {
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
let workers_vec = workers.drain(..).collect::<Vec<_>>();
join_all(workers_vec).await;
return;
@@ -73,12 +73,12 @@ impl BackgroundRunner {
self.job_notify.notify();
}
- pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
+ pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = JobOutput> + Send + 'static,
{
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await {
@@ -93,7 +93,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone();
loop {
let must_exit: bool = *stop_signal.borrow();
- if let Some(job) = self.dequeue_job(must_exit).await {
+ if let Some(job) = self.dequeue_job(must_exit) {
if let Err(e) = job.await {
error!("Job failed: {}", e)
}
@@ -110,8 +110,8 @@ impl BackgroundRunner {
}
}
- async fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
- let mut queue = self.queue_out.lock().await;
+ fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
+ let mut queue = self.queue_out.lock().unwrap();
while let Ok((job, cancellable)) = queue.try_recv() {
if cancellable && must_exit {
continue;