diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/admin/mod.rs | 128 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 3 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 8 | ||||
-rw-r--r-- | src/garage/cli_v2/mod.rs | 30 | ||||
-rw-r--r-- | src/garage/cli_v2/worker.rs | 89 | ||||
-rw-r--r-- | src/garage/main.rs | 4 | ||||
-rw-r--r-- | src/garage/server.rs | 4 |
7 files changed, 113 insertions, 153 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 910a875c..f493d0c5 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,7 +27,7 @@ use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse}; -use garage_api_admin::EndpointHandler as AdminApiEndpoint; +use garage_api_admin::RequestHandler as AdminApiEndpoint; use garage_api_common::generic_server::ApiError; use crate::cli::*; @@ -50,7 +50,6 @@ pub enum AdminRpc { HashMap<usize, garage_util::background::WorkerInfo>, WorkerListOpt, ), - WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec<BlockResyncErrorInfo>), BlockInfo { @@ -59,15 +58,6 @@ pub enum AdminRpc { versions: Vec<Result<Version, Uuid>>, uploads: Vec<MultipartUpload>, }, - - // Proxying HTTP Admin API endpoints - ApiRequest(AdminApiRequest), - ApiOkResponse(TaggedAdminApiResponse), - ApiErrorResponse { - http_code: u16, - error_code: String, - message: String, - }, } impl Rpc for AdminRpc { @@ -367,101 +357,7 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option<String>, - ) -> Result<AdminRpc, Error> { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result<AdminRpc, Error> { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) + _ => unreachable!(), } } @@ -501,25 +397,6 @@ impl AdminRpcHandler { } } } - - // ================== PROXYING ADMIN API REQUESTS =================== - - async fn handle_api_request( - self: &Arc<Self>, - req: &AdminApiRequest, - ) -> Result<AdminRpc, Error> { - let req = req.clone(); - info!("Proxied admin API request: {}", req.name()); - let res = req.handle(&self.garage).await; - match res { - Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())), - Err(e) => Ok(AdminRpc::ApiErrorResponse { - http_code: e.http_status_code().as_u16(), - error_code: e.code().to_string(), - message: e.to_string(), - }), - } - } } #[async_trait] @@ -535,7 +412,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler { AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - AdminRpc::ApiRequest(r) => self.handle_api_request(r).await, m => Err(GarageError::unexpected_rpc_message(m).into()), } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index a6540c65..6f1b0681 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -20,9 +20,6 @@ pub async fn cmd_admin( AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } - AdminRpc::WorkerVars(wv) => { - print_worker_vars(wv); - } AdminRpc::WorkerInfo(tid, wi) => { print_worker_info(tid, wi); } diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index a3a1480e..8261fb3e 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -126,14 +126,6 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } -pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { - let table = wv - .into_iter() - .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) - .collect::<Vec<_>>(); - format_table(table); -} - pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 6cc13b2d..b9bf05fe 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,8 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod worker; + use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; @@ -13,7 +15,8 @@ use garage_rpc::system::*; use garage_rpc::*; use garage_api_admin::api::*; -use garage_api_admin::EndpointHandler as AdminApiEndpoint; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; +use garage_api_admin::RequestHandler as AdminApiEndpoint; use crate::admin::*; use crate::cli as cli_v1; @@ -23,6 +26,7 @@ use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>, pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>, + pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>, pub rpc_host: NodeID, } @@ -36,6 +40,7 @@ impl Cli { Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, + Command::Worker(wo) => self.cmd_worker(wo).await, // TODO Command::Repair(ro) => cli_v1::cmd_admin( @@ -50,13 +55,6 @@ impl Cli { .await .ok_or_message("cli_v1") } - Command::Worker(wo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), - ) - .await - .ok_or_message("cli_v1"), Command::Block(bo) => cli_v1::cmd_admin( &self.admin_rpc_endpoint, self.rpc_host, @@ -85,14 +83,16 @@ impl Cli { let req = AdminApiRequest::from(req); let req_name = req.name(); match self - .admin_rpc_endpoint - .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL) - .await? - .ok_or_message("rpc")? + .proxy_rpc_endpoint + .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL) + .await?? { - AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp) - .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))), - AdminRpc::ApiErrorResponse { + ProxyRpcResponse::ProxyApiOkResponse(resp) => { + <T as AdminApiEndpoint>::Response::try_from(resp).map_err(|_| { + Error::Message(format!("{} returned unexpected response", req_name)) + }) + } + ProxyRpcResponse::ApiErrorResponse { http_code, error_code, message, diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs new file mode 100644 index 00000000..0dfe3e96 --- /dev/null +++ b/src/garage/cli_v2/worker.rs @@ -0,0 +1,89 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { + match cmd { + WorkerOperation::Get { + all_nodes, + variable, + } => self.cmd_get_var(all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.cmd_set_var(all_nodes, variable, value).await, + wo => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::Worker(wo), + ) + .await + .ok_or_message("cli_v1"), + } + } + + pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> { + let res = self + .api_request(GetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetWorkerVariableRequest { variable: var }, + }) + .await?; + + let mut table = vec![]; + for (node, vars) in res.success.iter() { + for (key, val) in vars.0.iter() { + table.push(format!("{:.16}\t{}\t{}", node, key, val)); + } + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } + + pub async fn cmd_set_var( + &self, + all: bool, + variable: String, + value: String, + ) -> Result<(), Error> { + let res = self + .api_request(SetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalSetWorkerVariableRequest { variable, value }, + }) + .await?; + + let mut table = vec![]; + for (node, kv) in res.success.iter() { + table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value)); + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 08c7cee7..022841f5 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -35,6 +35,8 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH}; + use admin::*; use cli::*; use secrets::Secrets; @@ -282,10 +284,12 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); + let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into()); let cli = cli_v2::Cli { system_rpc_endpoint, admin_rpc_endpoint, + proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/garage/server.rs b/src/garage/server.rs index 9e58fa6d..f17f641b 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::sync::Arc; use tokio::sync::watch; @@ -64,8 +65,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er } info!("Initialize Admin API server and metrics collector..."); - let admin_server = AdminApiServer::new( + let admin_server: Arc<AdminApiServer> = AdminApiServer::new( garage.clone(), + background.clone(), #[cfg(feature = "metrics")] metrics_exporter, ); |