aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/admin/mod.rs128
-rw-r--r--src/garage/cli/cmd.rs3
-rw-r--r--src/garage/cli/util.rs8
-rw-r--r--src/garage/cli_v2/mod.rs30
-rw-r--r--src/garage/cli_v2/worker.rs89
-rw-r--r--src/garage/main.rs4
-rw-r--r--src/garage/server.rs4
7 files changed, 113 insertions, 153 deletions
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
index 910a875c..f493d0c5 100644
--- a/src/garage/admin/mod.rs
+++ b/src/garage/admin/mod.rs
@@ -27,7 +27,7 @@ use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
-use garage_api_admin::EndpointHandler as AdminApiEndpoint;
+use garage_api_admin::RequestHandler as AdminApiEndpoint;
use garage_api_common::generic_server::ApiError;
use crate::cli::*;
@@ -50,7 +50,6 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
- WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@@ -59,15 +58,6 @@ pub enum AdminRpc {
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
},
-
- // Proxying HTTP Admin API endpoints
- ApiRequest(AdminApiRequest),
- ApiOkResponse(TaggedAdminApiResponse),
- ApiErrorResponse {
- http_code: u16,
- error_code: String,
- message: String,
- },
}
impl Rpc for AdminRpc {
@@ -367,101 +357,7 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerOperation::Get {
- all_nodes,
- variable,
- } => self.handle_get_var(*all_nodes, variable).await,
- WorkerOperation::Set {
- all_nodes,
- variable,
- value,
- } => self.handle_set_var(*all_nodes, variable, value).await,
- }
- }
-
- async fn handle_get_var(
- &self,
- all_nodes: bool,
- variable: &Option<String>,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Get {
- all_nodes: false,
- variable: variable.clone(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- #[allow(clippy::collapsible_else_if)]
- if let Some(v) = variable {
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- v.clone(),
- self.garage.bg_vars.get(v)?,
- )]))
- } else {
- let mut vars = self.garage.bg_vars.get_all();
- vars.sort();
- Ok(AdminRpc::WorkerVars(
- vars.into_iter()
- .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
- .collect(),
- ))
- }
- }
- }
-
- async fn handle_set_var(
- &self,
- all_nodes: bool,
- variable: &str,
- value: &str,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Set {
- all_nodes: false,
- variable: variable.to_string(),
- value: value.to_string(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- self.garage.bg_vars.set(variable, value)?;
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- variable.to_string(),
- value.to_string(),
- )]))
+ _ => unreachable!(),
}
}
@@ -501,25 +397,6 @@ impl AdminRpcHandler {
}
}
}
-
- // ================== PROXYING ADMIN API REQUESTS ===================
-
- async fn handle_api_request(
- self: &Arc<Self>,
- req: &AdminApiRequest,
- ) -> Result<AdminRpc, Error> {
- let req = req.clone();
- info!("Proxied admin API request: {}", req.name());
- let res = req.handle(&self.garage).await;
- match res {
- Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
- Err(e) => Ok(AdminRpc::ApiErrorResponse {
- http_code: e.http_status_code().as_u16(),
- error_code: e.code().to_string(),
- message: e.to_string(),
- }),
- }
- }
}
#[async_trait]
@@ -535,7 +412,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a6540c65..6f1b0681 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -20,9 +20,6 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
- AdminRpc::WorkerVars(wv) => {
- print_worker_vars(wv);
- }
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index a3a1480e..8261fb3e 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -126,14 +126,6 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
-pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
- let table = wv
- .into_iter()
- .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
- .collect::<Vec<_>>();
- format_table(table);
-}
-
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs
index 6cc13b2d..b9bf05fe 100644
--- a/src/garage/cli_v2/mod.rs
+++ b/src/garage/cli_v2/mod.rs
@@ -3,6 +3,8 @@ pub mod cluster;
pub mod key;
pub mod layout;
+pub mod worker;
+
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@@ -13,7 +15,8 @@ use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
-use garage_api_admin::EndpointHandler as AdminApiEndpoint;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
+use garage_api_admin::RequestHandler as AdminApiEndpoint;
use crate::admin::*;
use crate::cli as cli_v1;
@@ -23,6 +26,7 @@ use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
+ pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@@ -36,6 +40,7 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
+ Command::Worker(wo) => self.cmd_worker(wo).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
@@ -50,13 +55,6 @@ impl Cli {
.await
.ok_or_message("cli_v1")
}
- Command::Worker(wo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::Worker(wo),
- )
- .await
- .ok_or_message("cli_v1"),
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
@@ -85,14 +83,16 @@ impl Cli {
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
- .admin_rpc_endpoint
- .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
- .await?
- .ok_or_message("rpc")?
+ .proxy_rpc_endpoint
+ .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
+ .await??
{
- AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
- .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
- AdminRpc::ApiErrorResponse {
+ ProxyRpcResponse::ProxyApiOkResponse(resp) => {
+ <T as AdminApiEndpoint>::Response::try_from(resp).map_err(|_| {
+ Error::Message(format!("{} returned unexpected response", req_name))
+ })
+ }
+ ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs
new file mode 100644
index 00000000..0dfe3e96
--- /dev/null
+++ b/src/garage/cli_v2/worker.rs
@@ -0,0 +1,89 @@
+//use bytesize::ByteSize;
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
+ match cmd {
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.cmd_get_var(all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.cmd_set_var(all_nodes, variable, value).await,
+ wo => cli_v1::cmd_admin(
+ &self.admin_rpc_endpoint,
+ self.rpc_host,
+ AdminRpc::Worker(wo),
+ )
+ .await
+ .ok_or_message("cli_v1"),
+ }
+ }
+
+ pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
+ let res = self
+ .api_request(GetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetWorkerVariableRequest { variable: var },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, vars) in res.success.iter() {
+ for (key, val) in vars.0.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, key, val));
+ }
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_set_var(
+ &self,
+ all: bool,
+ variable: String,
+ value: String,
+ ) -> Result<(), Error> {
+ let res = self
+ .api_request(SetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalSetWorkerVariableRequest { variable, value },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, kv) in res.success.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 08c7cee7..022841f5 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -35,6 +35,8 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
+
use admin::*;
use cli::*;
use secrets::Secrets;
@@ -282,10 +284,12 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
+ let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
admin_rpc_endpoint,
+ proxy_rpc_endpoint,
rpc_host: id,
};
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 9e58fa6d..f17f641b 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -1,4 +1,5 @@
use std::path::PathBuf;
+use std::sync::Arc;
use tokio::sync::watch;
@@ -64,8 +65,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
}
info!("Initialize Admin API server and metrics collector...");
- let admin_server = AdminApiServer::new(
+ let admin_server: Arc<AdminApiServer> = AdminApiServer::new(
garage.clone(),
+ background.clone(),
#[cfg(feature = "metrics")]
metrics_exporter,
);