diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-11 13:47:21 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-11 13:47:21 +0100 |
commit | 8d63738cb062e816fc01c6aa2b32936ad31ff65b (patch) | |
tree | d69bb200a86788e2d7cde822afb70d54f7e7f448 /src/util/background.rs | |
parent | 3214dd52dd144c99353830d7340ea158e262b06f (diff) | |
download | garage-8d63738cb062e816fc01c6aa2b32936ad31ff65b.tar.gz garage-8d63738cb062e816fc01c6aa2b32936ad31ff65b.zip |
Checkpoint: add merkle tree in data table
Diffstat (limited to 'src/util/background.rs')
-rw-r--r-- | src/util/background.rs | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/src/util/background.rs b/src/util/background.rs index 937062dd..8081f157 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,11 +1,11 @@ use core::future::Future; use std::pin::Pin; +use std::sync::Mutex; use futures::future::join_all; use futures::select; use futures_util::future::*; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::{mpsc, watch, Notify}; use crate::error::Error; @@ -38,7 +38,7 @@ impl BackgroundRunner { } pub async fn run(self: Arc<Self>) { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); for i in 0..self.n_runners { workers.push(tokio::spawn(self.clone().runner(i))); } @@ -47,7 +47,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); while let Some(exit_now) = stop_signal.recv().await { if exit_now { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let workers_vec = workers.drain(..).collect::<Vec<_>>(); join_all(workers_vec).await; return; @@ -73,12 +73,12 @@ impl BackgroundRunner { self.job_notify.notify(); } - pub async fn spawn_worker<F, T>(&self, name: String, worker: F) + pub fn spawn_worker<F, T>(&self, name: String, worker: F) where F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, T: Future<Output = JobOutput> + Send + 'static, { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let stop_signal = self.stop_signal.clone(); workers.push(tokio::spawn(async move { if let Err(e) = worker(stop_signal).await { @@ -93,7 +93,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); loop { let must_exit: bool = *stop_signal.borrow(); - if let Some(job) = self.dequeue_job(must_exit).await { + if let Some(job) = self.dequeue_job(must_exit) { if let Err(e) = job.await { error!("Job failed: {}", e) } @@ -110,8 +110,8 @@ impl BackgroundRunner { } } - async fn dequeue_job(&self, must_exit: bool) -> Option<Job> { - let mut queue = self.queue_out.lock().await; + fn dequeue_job(&self, must_exit: bool) -> Option<Job> { + let mut queue = self.queue_out.lock().unwrap(); while let Ok((job, cancellable)) = queue.try_recv() { if cancellable && must_exit { continue; |