aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin_rpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/admin_rpc.rs')
-rw-r--r--src/garage/admin_rpc.rs80
1 files changed, 45 insertions, 35 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index fe5a9d88..b9e57c40 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::error::Error;
@@ -10,8 +11,7 @@ use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@@ -19,10 +19,8 @@ use garage_model::key_table::*;
use crate::cli::*;
use crate::repair::Repair;
-use crate::*;
-pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub const ADMIN_RPC_PATH: &str = "_admin";
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpc {
@@ -33,41 +31,31 @@ pub enum AdminRpc {
// Replies
Ok(String),
+ Error(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
}
-impl RpcMessage for AdminRpc {}
+impl Message for AdminRpc {
+ type Response = AdminRpc;
+}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
- rpc_client: Arc<RpcClient<AdminRpc>>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
- let rpc_client = garage.system.clone().rpc_client::<AdminRpc>(ADMIN_RPC_PATH);
- Arc::new(Self { garage, rpc_client })
- }
-
- pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
- rpc_server.add_handler::<AdminRpc, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
- let self2 = self.clone();
- async move {
- match msg {
- AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
- AdminRpc::Stats(opt) => self2.handle_stats(opt).await,
- _ => Err(Error::BadRpc("Invalid RPC".to_string())),
- }
- }
- });
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self { garage, endpoint });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
- async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRpc, Error> {
+ async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => {
let bucket_names = self
@@ -187,7 +175,7 @@ impl AdminRpcHandler {
}
}
- async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRpc, Error> {
+ async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => {
let key_ids = self
@@ -210,13 +198,13 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::New(query) => {
- let key = Key::new(query.name);
+ let key = Key::new(query.name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::Rename(query) => {
let mut key = self.get_existing_key(&query.key_pattern).await?;
- key.name.update(query.new_name);
+ key.name.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
@@ -353,17 +341,18 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
if self
- .rpc_client
+ .endpoint
.call(
- *node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- ADMIN_RPC_TIMEOUT,
+ &node,
+ &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
)
.await
.is_err()
{
- failures.push(*node);
+ failures.push(node);
}
}
if failures.is_empty() {
@@ -397,14 +386,16 @@ impl AdminRpcHandler {
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
+
let mut opt = opt.clone();
opt.all_nodes = false;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
match self
- .rpc_client
- .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT)
+ .endpoint
+ .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
@@ -495,4 +486,23 @@ impl AdminRpcHandler {
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
}
+
+ async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> {
+ match msg {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ _ => Err(Error::BadRpc("Invalid RPC".to_string())),
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e)))
+ }
}