aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin_rpc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src/garage/admin_rpc.rs
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/garage/admin_rpc.rs')
-rw-r--r--src/garage/admin_rpc.rs46
1 files changed, 23 insertions, 23 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index b9e57c40..339d5bdb 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -31,15 +31,14 @@ pub enum AdminRpc {
// Replies
Ok(String),
- Error(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
}
-impl Message for AdminRpc {
- type Response = AdminRpc;
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpc, Error>;
}
pub struct AdminRpcHandler {
@@ -341,17 +340,20 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
- let node = NodeID::from_slice(node.as_slice()).unwrap();
- if self
+ let node = (*node).into();
+ let resp = self
.endpoint
.call(
&node,
&AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
- .await
- .is_err()
- {
+ .await;
+ let is_err = match resp {
+ Ok(Ok(_)) => false,
+ _ => true,
+ };
+ if is_err {
failures.push(node);
}
}
@@ -386,17 +388,17 @@ impl AdminRpcHandler {
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
- let node = NodeID::from_slice(node.as_slice()).unwrap();
-
let mut opt = opt.clone();
opt.all_nodes = false;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+
+ let node_id = (*node).into();
match self
.endpoint
- .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
+ .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL)
+ .await?
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
@@ -486,9 +488,16 @@ impl AdminRpcHandler {
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
}
+}
- async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> {
- match msg {
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpc, Error> {
+ match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
@@ -497,12 +506,3 @@ impl AdminRpcHandler {
}
}
}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e)))
- }
-}