aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 23:14:12 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 23:14:12 +0100
commit6a8439fd1345ecae7414386f76dda7a03eb14df2 (patch)
treea6306030d7f0cc41158512e600683b8874f7d85e /src/garage
parent0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (diff)
downloadgarage-6a8439fd1345ecae7414386f76dda7a03eb14df2.tar.gz
garage-6a8439fd1345ecae7414386f76dda7a03eb14df2.zip
Some improvements in background worker but we terminate late
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/admin_rpc.rs29
-rw-r--r--src/garage/repair.rs6
-rw-r--r--src/garage/server.rs33
3 files changed, 35 insertions, 33 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index 40674e75..b2145ca5 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -193,7 +193,12 @@ impl AdminRpcHandler {
let key_ids = self
.garage
.key_table
- .get_range(&EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ )
.await?
.iter()
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
@@ -257,15 +262,24 @@ impl AdminRpcHandler {
}
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
- let candidates = self.garage
+ let candidates = self
+ .garage
.key_table
- .get_range(&EmptyKey, None, Some(KeyFilter::Matches(pattern.to_string())), 10)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Matches(pattern.to_string())),
+ 10,
+ )
.await?
.into_iter()
.filter(|k| !k.deleted.get())
.collect::<Vec<_>>();
if candidates.len() != 1 {
- Err(Error::Message(format!("{} matching keys", candidates.len())))
+ Err(Error::Message(format!(
+ "{} matching keys",
+ candidates.len()
+ )))
} else {
Ok(candidates.into_iter().next().unwrap())
}
@@ -469,12 +483,7 @@ impl AdminRpcHandler {
t.data.merkle_updater.merkle_tree_len()
)
.unwrap();
- writeln!(
- to,
- " GC todo queue length: {}",
- t.data.gc_todo_len()
- )
- .unwrap();
+ writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
Ok(())
}
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 4ee66452..8200f1f0 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -16,11 +16,7 @@ pub struct Repair {
}
impl Repair {
- pub async fn repair_worker(
- &self,
- opt: RepairOpt,
- must_exit: watch::Receiver<bool>,
- ) {
+ pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index ce90ecab..c45a69b8 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -47,10 +47,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing background runner...");
let (send_cancel, watch_cancel) = watch::channel(false);
- let background = BackgroundRunner::new(16, watch_cancel.clone());
+ let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config, db, background.clone(), &mut rpc_server);
+ let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
+ let bootstrap = garage.system.clone().bootstrap(
+ &config.bootstrap_peers[..],
+ config.consul_host,
+ config.consul_service_name,
+ );
info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
@@ -58,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone()));
+ let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!(
- garage
- .system
- .clone()
- .bootstrap(
- &garage.config.bootstrap_peers[..],
- garage.config.consul_host.clone(),
- garage.config.consul_service_name.clone()
- )
- .map(|rv| {
- info!("Bootstrap done");
- Ok(rv)
- }),
+ bootstrap.map(|rv| {
+ info!("Bootstrap done");
+ Ok(rv)
+ }),
run_rpc_server.map(|rv| {
info!("RPC server exited");
rv
@@ -85,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Web server exited");
rv
}),
- background.run().map(|rv| {
- info!("Background runner exited");
- Ok(rv)
+ await_background_done.map(|rv| {
+ info!("Background runner exited: {:?}", rv);
+ Ok(())
}),
shutdown_signal(send_cancel),
)?;