From 4067797d0142ee7860aff8da95d65820d6cc0889 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 14 Oct 2021 11:50:12 +0200 Subject: First port of Garage to Netapp --- src/garage/admin_rpc.rs | 80 +++++++++++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 35 deletions(-) (limited to 'src/garage/admin_rpc.rs') diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index fe5a9d88..b9e57c40 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_util::error::Error; @@ -10,8 +11,7 @@ use garage_table::crdt::Crdt; use garage_table::replication::*; use garage_table::*; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; @@ -19,10 +19,8 @@ use garage_model::key_table::*; use crate::cli::*; use crate::repair::Repair; -use crate::*; -pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub const ADMIN_RPC_PATH: &str = "_admin"; +pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRpc { @@ -33,41 +31,31 @@ pub enum AdminRpc { // Replies Ok(String), + Error(String), BucketList(Vec), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), } -impl RpcMessage for AdminRpc {} +impl Message for AdminRpc { + type Response = AdminRpc; +} pub struct AdminRpcHandler { garage: Arc, - rpc_client: Arc>, + endpoint: Arc>, } impl AdminRpcHandler { pub fn new(garage: Arc) -> Arc { - let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); - Arc::new(Self { garage, rpc_client }) - } - - pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { - rpc_server.add_handler::(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { - let self2 = self.clone(); - async move { - match msg { - AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - AdminRpc::Stats(opt) => self2.handle_stats(opt).await, - _ => Err(Error::BadRpc("Invalid RPC".to_string())), - } - } - }); + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, endpoint }); + admin.endpoint.set_handler(admin.clone()); + admin } - async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result { + async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { match cmd { BucketOperation::List => { let bucket_names = self @@ -187,7 +175,7 @@ impl AdminRpcHandler { } } - async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { + async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { match cmd { KeyOperation::List => { let key_ids = self @@ -210,13 +198,13 @@ impl AdminRpcHandler { Ok(AdminRpc::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name); + let key = Key::new(query.name.clone()); self.garage.key_table.insert(&key).await?; Ok(AdminRpc::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_pattern).await?; - key.name.update(query.new_name); + key.name.update(query.new_name.clone()); self.garage.key_table.insert(&key).await?; Ok(AdminRpc::KeyInfo(key)) } @@ -353,17 +341,18 @@ impl AdminRpcHandler { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { + let node = NodeID::from_slice(node.as_slice()).unwrap(); if self - .rpc_client + .endpoint .call( - *node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - ADMIN_RPC_TIMEOUT, + &node, + &AdminRpc::LaunchRepair(opt_to_send.clone()), + PRIO_NORMAL, ) .await .is_err() { - failures.push(*node); + failures.push(node); } } if failures.is_empty() { @@ -397,14 +386,16 @@ impl AdminRpcHandler { let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { + let node = NodeID::from_slice(node.as_slice()).unwrap(); + let mut opt = opt.clone(); opt.all_nodes = false; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); match self - .rpc_client - .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT) + .endpoint + .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL) .await { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), @@ -495,4 +486,23 @@ impl AdminRpcHandler { .unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); } + + async fn handle_rpc(self: &Arc, msg: &AdminRpc) -> Result { + match msg { + AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, + _ => Err(Error::BadRpc("Invalid RPC".to_string())), + } + } +} + +#[async_trait] +impl EndpointHandler for AdminRpcHandler { + async fn handle(self: &Arc, message: &AdminRpc, _from: NodeID) -> AdminRpc { + self.handle_rpc(message) + .await + .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e))) + } } -- cgit v1.2.3