aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/admin/mod.rs')
-rw-r--r--src/garage/admin/mod.rs46
1 files changed, 30 insertions, 16 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index e2468143..3bbc2b86 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -4,9 +4,11 @@ mod key;
use std::collections::HashMap;
use std::fmt::Write;
+use std::future::Future;
use std::sync::Arc;
-use async_trait::async_trait;
+use futures::future::FutureExt;
+
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
@@ -144,7 +146,12 @@ impl AdminRpcHandler {
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ let mut all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
+ for node in self.garage.system.get_known_nodes().iter() {
+ if node.is_up && !all_nodes.contains(&node.id) {
+ all_nodes.push(node.id);
+ }
+ }
for node in all_nodes.iter() {
let mut opt = opt.clone();
@@ -482,7 +489,7 @@ impl AdminRpcHandler {
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
- .await
+ .await?
}))
.await;
@@ -495,7 +502,11 @@ impl AdminRpcHandler {
ret.push(format!("{:?}\t{}", to, res_str));
}
- Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ if resps.iter().any(Result::is_err) {
+ Err(GarageError::Message(format_table_to_string(ret)).into())
+ } else {
+ Ok(AdminRpc::Ok(format_table_to_string(ret)))
+ }
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
@@ -505,22 +516,25 @@ impl AdminRpcHandler {
}
}
-#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
+ fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
+ ) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
+ let self2 = self.clone();
+ async move {
+ match message {
+ AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
+ AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
}
+ .boxed()
}
}