From 8d63738cb062e816fc01c6aa2b32936ad31ff65b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 11 Mar 2021 13:47:21 +0100 Subject: Checkpoint: add merkle tree in data table --- src/util/background.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'src/util') diff --git a/src/util/background.rs b/src/util/background.rs index 937062dd..8081f157 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,11 +1,11 @@ use core::future::Future; use std::pin::Pin; +use std::sync::Mutex; use futures::future::join_all; use futures::select; use futures_util::future::*; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::{mpsc, watch, Notify}; use crate::error::Error; @@ -38,7 +38,7 @@ impl BackgroundRunner { } pub async fn run(self: Arc) { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); for i in 0..self.n_runners { workers.push(tokio::spawn(self.clone().runner(i))); } @@ -47,7 +47,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); while let Some(exit_now) = stop_signal.recv().await { if exit_now { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let workers_vec = workers.drain(..).collect::>(); join_all(workers_vec).await; return; @@ -73,12 +73,12 @@ impl BackgroundRunner { self.job_notify.notify(); } - pub async fn spawn_worker(&self, name: String, worker: F) + pub fn spawn_worker(&self, name: String, worker: F) where F: FnOnce(watch::Receiver) -> T + Send + 'static, T: Future + Send + 'static, { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let stop_signal = self.stop_signal.clone(); workers.push(tokio::spawn(async move { if let Err(e) = worker(stop_signal).await { @@ -93,7 +93,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); loop { let must_exit: bool = *stop_signal.borrow(); - if let Some(job) = self.dequeue_job(must_exit).await { + if let Some(job) = self.dequeue_job(must_exit) { if let Err(e) = job.await { error!("Job failed: {}", e) } @@ -110,8 +110,8 @@ impl BackgroundRunner { } } - async fn dequeue_job(&self, must_exit: bool) -> Option { - let mut queue = self.queue_out.lock().await; + fn dequeue_job(&self, must_exit: bool) -> Option { + let mut queue = self.queue_out.lock().unwrap(); while let Ok((job, cancellable)) = queue.try_recv() { if cancellable && must_exit { continue; -- cgit v1.2.3