aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 23:14:12 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 23:14:12 +0100
commit6a8439fd1345ecae7414386f76dda7a03eb14df2 (patch)
treea6306030d7f0cc41158512e600683b8874f7d85e
parent0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (diff)
downloadgarage-6a8439fd1345ecae7414386f76dda7a03eb14df2.tar.gz
garage-6a8439fd1345ecae7414386f76dda7a03eb14df2.zip
Some improvements in background worker but we terminate late
-rw-r--r--src/garage/admin_rpc.rs29
-rw-r--r--src/garage/repair.rs6
-rw-r--r--src/garage/server.rs33
-rw-r--r--src/model/block.rs13
-rw-r--r--src/model/key_table.rs3
-rw-r--r--src/model/object_table.rs3
-rw-r--r--src/model/version_table.rs3
-rw-r--r--src/rpc/membership.rs25
-rw-r--r--src/rpc/ring.rs10
-rw-r--r--src/rpc/rpc_client.rs2
-rw-r--r--src/rpc/rpc_server.rs8
-rw-r--r--src/table/merkle.rs5
-rw-r--r--src/table/sync.rs2
-rw-r--r--src/util/background.rs175
14 files changed, 169 insertions, 148 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index 40674e75..b2145ca5 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -193,7 +193,12 @@ impl AdminRpcHandler {
let key_ids = self
.garage
.key_table
- .get_range(&EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ )
.await?
.iter()
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
@@ -257,15 +262,24 @@ impl AdminRpcHandler {
}
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
- let candidates = self.garage
+ let candidates = self
+ .garage
.key_table
- .get_range(&EmptyKey, None, Some(KeyFilter::Matches(pattern.to_string())), 10)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Matches(pattern.to_string())),
+ 10,
+ )
.await?
.into_iter()
.filter(|k| !k.deleted.get())
.collect::<Vec<_>>();
if candidates.len() != 1 {
- Err(Error::Message(format!("{} matching keys", candidates.len())))
+ Err(Error::Message(format!(
+ "{} matching keys",
+ candidates.len()
+ )))
} else {
Ok(candidates.into_iter().next().unwrap())
}
@@ -469,12 +483,7 @@ impl AdminRpcHandler {
t.data.merkle_updater.merkle_tree_len()
)
.unwrap();
- writeln!(
- to,
- " GC todo queue length: {}",
- t.data.gc_todo_len()
- )
- .unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
Ok(())
}
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 4ee66452..8200f1f0 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -16,11 +16,7 @@ pub struct Repair {
}
impl Repair {
- pub async fn repair_worker(
- &self,
- opt: RepairOpt,
- must_exit: watch::Receiver<bool>,
- ) {
+ pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index ce90ecab..c45a69b8 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -47,10 +47,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing background runner...");
let (send_cancel, watch_cancel) = watch::channel(false);
- let background = BackgroundRunner::new(16, watch_cancel.clone());
+ let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config, db, background.clone(), &mut rpc_server);
+ let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
+ let bootstrap = garage.system.clone().bootstrap(
+ &config.bootstrap_peers[..],
+ config.consul_host,
+ config.consul_service_name,
+ );
info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
@@ -58,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone()));
+ let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
- garage
- .system
- .clone()
- .bootstrap(
- &garage.config.bootstrap_peers[..],
- garage.config.consul_host.clone(),
- garage.config.consul_service_name.clone()
- )
- .map(|rv| {
- info!("Bootstrap done");
- Ok(rv)
- }),
+ bootstrap.map(|rv| {
+ info!("Bootstrap done");
+ Ok(rv)
+ }),
run_rpc_server.map(|rv| {
info!("RPC server exited");
rv
@@ -85,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Web server exited");
rv
}),
- background.run().map(|rv| {
- info!("Background runner exited");
- Ok(rv)
+ await_background_done.map(|rv| {
+ info!("Background runner exited: {:?}", rv);
+ Ok(())
}),
shutdown_signal(send_cancel),
)?;
diff --git a/src/model/block.rs b/src/model/block.rs
index 7185372c..a3958866 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -254,19 +254,18 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- ) {
+ async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
- tokio::time::sleep(Duration::from_secs(10)).await;
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
+ }
}
}
}
-
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
if let Some(first_item) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_item?;
@@ -280,7 +279,7 @@ impl BlockManager {
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
self.resync_queue.remove(&time_bytes)?;
- res?; // propagate error to delay main loop
+ res?; // propagate error to delay main loop
} else {
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 88d7b4ff..02dcf68c 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -109,7 +109,8 @@ impl TableSchema for KeyTable {
KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),
KeyFilter::Matches(pat) => {
let pat = pat.to_lowercase();
- entry.key_id.to_lowercase().starts_with(&pat) || entry.name.get().to_lowercase() == pat
+ entry.key_id.to_lowercase().starts_with(&pat)
+ || entry.name.get().to_lowercase() == pat
}
}
}
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index d08bba70..99fad3ce 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -195,8 +195,7 @@ impl TableSchema for ObjectTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
- // TODO not cancellable
- self.background.spawn_cancellable(async move {
+ self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
for v in old_v.versions.iter() {
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 19343890..841fbfea 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -110,8 +110,7 @@ impl TableSchema for VersionTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
- // TODO not cancellable
- self.background.spawn_cancellable(async move {
+ self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
if new_v.deleted.get() && !old_v.deleted.get() {
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6cc3ed2e..4e9822fa 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,9 +11,9 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
+use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use tokio::io::AsyncWriteExt;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -316,17 +316,16 @@ impl System {
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
- self.clone()
- .background
- .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal));
+ self.background
+ .spawn_worker(format!("ping loop"), |stop_signal| {
+ self2.ping_loop(stop_signal)
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
- self2
- .consul_loop(stop_signal, consul_host, consul_service_name)
+ self2.consul_loop(stop_signal, consul_host, consul_service_name)
});
}
}
@@ -531,7 +530,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok),
);
- self.background.spawn(self.clone().save_network_config()).await;
+ self.background.spawn(self.clone().save_network_config());
}
Ok(Message::Ok)
@@ -568,7 +567,7 @@ impl System {
consul_host: String,
consul_service_name: String,
) {
- loop {
+ while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
@@ -583,11 +582,7 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- _ = stop_signal.changed().fuse() => {
- if *stop_signal.borrow() {
- return;
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 215ab031..a89b730c 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -161,11 +161,11 @@ impl Ring {
})
.collect::<Vec<_>>();
- eprintln!("RING: --");
- for e in ring.iter() {
- eprintln!("{:?}", e);
- }
- eprintln!("END --");
+ // eprintln!("RING: --");
+ // for e in ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
Self { config, ring }
}
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 60286256..cffcf106 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
});
- self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await;
+ self.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
Ok(results)
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 3c5014c4..0c5bf6f9 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -13,9 +13,9 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
-use tokio_stream::wrappers::TcpListenerStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use tokio_stream::wrappers::TcpListenerStream;
use garage_util::config::TlsConfig;
use garage_util::data::*;
@@ -52,7 +52,11 @@ where
trace!(
"Request message: {}",
- serde_json::to_string(&msg).unwrap_or("<json error>".into()).chars().take(100).collect::<String>()
+ serde_json::to_string(&msg)
+ .unwrap_or("<json error>".into())
+ .chars()
+ .take(100)
+ .collect::<String>()
);
match handler(msg, sockaddr).await {
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 86289bf1..60b7833f 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -101,10 +101,7 @@ impl MerkleUpdater {
ret
}
- async fn updater_loop(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- ) {
+ async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() {
match x {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 65231cd5..f8fef53c 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,7 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::{select};
+use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
use rand::Rng;
diff --git a/src/util/background.rs b/src/util/background.rs
index 0ec9779a..35d41d9f 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,10 +1,11 @@
use core::future::Future;
use std::pin::Pin;
-use std::sync::Mutex;
-
-use arc_swap::ArcSwapOption;
use std::sync::Arc;
-use tokio::sync::{mpsc, watch};
+use std::time::Duration;
+
+use futures::future::*;
+use futures::select;
+use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
@@ -14,99 +15,115 @@ type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>,
- queue_in: ArcSwapOption<mpsc::UnboundedSender<(Job, bool)>>,
-
- workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
+ queue_in: mpsc::UnboundedSender<(Job, bool)>,
+ worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
- pub fn new(n_runners: usize, stop_signal: watch::Receiver<bool>) -> Arc<Self> {
- let (queue_in, queue_out) = mpsc::unbounded_channel();
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (worker_in, mut worker_out) = mpsc::unbounded_channel();
+
+ let stop_signal_2 = stop_signal.clone();
+ let await_all_done = tokio::spawn(async move {
+ loop {
+ let wkr = {
+ select! {
+ item = worker_out.recv().fuse() => {
+ match item {
+ Some(x) => x,
+ None => break,
+ }
+ }
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
+ if *stop_signal_2.borrow() {
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+ };
+ if let Err(e) = wkr.await {
+ error!("Error while awaiting for worker: {}", e);
+ }
+ }
+ });
- let mut workers = vec![];
- let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out));
+ let (queue_in, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
let stop_signal = stop_signal.clone();
- workers.push(tokio::spawn(async move {
- while let Some((job, cancellable)) = queue_out.lock().await.recv().await {
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
+ worker_in
+ .send(tokio::spawn(async move {
+ loop {
+ let (job, cancellable) = {
+ select! {
+ item = wait_job(&queue_out).fuse() => match item {
+ // We received a task, process it
+ Some(x) => x,
+ // We received a signal that no more tasks will ever be sent
+ // because the sending side was dropped. Exit now.
+ None => break,
+ },
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
+ if *stop_signal.borrow() {
+ // Nothing has been going on for 10 secs, and we are shutting
+ // down. Exit now.
+ break;
+ } else {
+ // Nothing is going on but we don't want to exit.
+ continue;
+ }
+ }
+ }
+ };
+ if cancellable && *stop_signal.borrow() {
+ continue;
+ }
+ if let Err(e) = job.await {
+ error!("Job failed: {}", e)
+ }
}
- }
- info!("Worker {} exiting", i);
- }));
+ info!("Background worker {} exiting", i);
+ }))
+ .unwrap();
}
- Arc::new(Self {
+ let bgrunner = Arc::new(Self {
stop_signal,
- queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))),
- workers: Mutex::new(workers),
- })
- }
-
- pub async fn run(self: Arc<Self>) {
- let mut stop_signal = self.stop_signal.clone();
-
- loop {
- let exit_now = match stop_signal.changed().await {
- Ok(()) => *stop_signal.borrow(),
- Err(e) => {
- error!("Watch .changed() error: {}", e);
- true
- }
- };
- if exit_now {
- break;
- }
- }
-
- info!("Closing background job queue_in...");
- drop(self.queue_in.swap(None));
-
- info!("Waiting for all workers to terminate...");
- while let Some(task) = self.workers.lock().unwrap().pop() {
- if let Err(e) = task.await {
- warn!("Error awaiting task: {}", e);
- }
- }
+ queue_in,
+ worker_in,
+ });
+ (bgrunner, await_all_done)
}
// Spawn a task to be run in background
- pub async fn spawn<T>(&self, job: T)
+ pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- match self.queue_in.load().as_ref() {
- Some(chan) => {
- let boxed: Job = Box::pin(job);
- chan.send((boxed, false)).map_err(|_| "send error").unwrap();
- }
- None => {
- warn!("Doing background job now because we are exiting...");
- if let Err(e) = job.await {
- warn!("Task failed: {}", e);
- }
- }
- }
+ let boxed: Job = Box::pin(job);
+ self.queue_in
+ .send((boxed, false))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
- match self.queue_in.load().as_ref() {
- Some(chan) => {
- let boxed: Job = Box::pin(job);
- chan.send((boxed, false)).map_err(|_| "send error").unwrap();
- }
- None => (), // drop job if we are exiting
- }
+ let boxed: Job = Box::pin(job);
+ self.queue_in
+ .send((boxed, true))
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
@@ -114,11 +131,19 @@ impl BackgroundRunner {
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
- let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
- workers.push(tokio::spawn(async move {
+ let task = tokio::spawn(async move {
+ info!("Worker started: {}", name);
worker(stop_signal).await;
info!("Worker exited: {}", name);
- }));
+ });
+ self.worker_in
+ .send(task)
+ .map_err(|_| "could not put job in queue")
+ .unwrap();
}
}
+
+async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
+ q.lock().await.recv().await
+}