aboutsummaryrefslogtreecommitdiff
path: root/src/garage/cli_v2/mod.rs
blob: 28c7c8240af7e17c1900791b1626417a6ac98e2f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
pub mod bucket;
pub mod cluster;
pub mod key;
pub mod layout;

pub mod block;
pub mod node;
pub mod worker;

use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;

use garage_util::error::*;

use garage_rpc::system::*;
use garage_rpc::*;

use garage_api_admin::api::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
use garage_api_admin::RequestHandler;

use crate::cli::structs::*;

pub struct Cli {
	pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
	pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
	pub rpc_host: NodeID,
}

impl Cli {
	pub async fn handle(&self, cmd: Command) -> Result<(), Error> {
		match cmd {
			Command::Status => self.cmd_status().await,
			Command::Node(NodeOperation::Connect(connect_opt)) => {
				self.cmd_connect(connect_opt).await
			}
			Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
			Command::Bucket(bo) => self.cmd_bucket(bo).await,
			Command::Key(ko) => self.cmd_key(ko).await,
			Command::Worker(wo) => self.cmd_worker(wo).await,
			Command::Block(bo) => self.cmd_block(bo).await,
			Command::Meta(mo) => self.cmd_meta(mo).await,
			Command::Stats(so) => self.cmd_stats(so).await,
			Command::Repair(ro) => self.cmd_repair(ro).await,

			_ => unreachable!(),
		}
	}

	pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
	where
		T: RequestHandler,
		AdminApiRequest: From<T>,
		<T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
	{
		let req = AdminApiRequest::from(req);
		let req_name = req.name();
		match self
			.proxy_rpc_endpoint
			.call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
			.await??
		{
			ProxyRpcResponse::ProxyApiOkResponse(resp) => {
				<T as RequestHandler>::Response::try_from(resp).map_err(|_| {
					Error::Message(format!("{} returned unexpected response", req_name))
				})
			}
			ProxyRpcResponse::ApiErrorResponse {
				http_code,
				error_code,
				message,
			} => Err(Error::Message(format!(
				"{} returned {} ({}): {}",
				req_name, error_code, http_code, message
			))),
			m => Err(Error::unexpected_rpc_message(m)),
		}
	}

	pub async fn local_api_request<T>(
		&self,
		req: T,
	) -> Result<<T as RequestHandler>::Response, Error>
	where
		T: RequestHandler,
		MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
		AdminApiRequest: From<MultiRequest<T>>,
		<MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
	{
		let req = MultiRequest {
			node: hex::encode(self.rpc_host),
			body: req,
		};
		let resp = self.api_request(req).await?;

		if let Some((_, e)) = resp.error.into_iter().next() {
			return Err(Error::Message(e));
		}
		if resp.success.len() != 1 {
			return Err(Error::Message(format!(
				"{} responses returned, expected 1",
				resp.success.len()
			)));
		}
		Ok(resp.success.into_iter().next().unwrap().1)
	}
}