1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
pub mod bucket;
pub mod cluster;
pub mod key;
pub mod layout;
pub mod worker;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
use garage_api_admin::RequestHandler;
use crate::admin::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
impl Cli {
pub async fn handle(&self, cmd: Command) -> Result<(), Error> {
match cmd {
Command::Status => self.cmd_status().await,
Command::Node(NodeOperation::Connect(connect_opt)) => {
self.cmd_connect(connect_opt).await
}
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
Command::Worker(wo) => self.cmd_worker(wo).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::LaunchRepair(ro),
)
.await
.ok_or_message("cli_v1"),
Command::Stats(so) => {
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
.await
.ok_or_message("cli_v1")
}
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
Command::Meta(mo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::MetaOperation(mo),
)
.await
.ok_or_message("cli_v1"),
_ => unreachable!(),
}
}
pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
where
T: RequestHandler,
AdminApiRequest: From<T>,
<T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
.proxy_rpc_endpoint
.call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
.await??
{
ProxyRpcResponse::ProxyApiOkResponse(resp) => {
<T as RequestHandler>::Response::try_from(resp).map_err(|_| {
Error::Message(format!("{} returned unexpected response", req_name))
})
}
ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
} => Err(Error::Message(format!(
"{} returned {} ({}): {}",
req_name, error_code, http_code, message
))),
m => Err(Error::unexpected_rpc_message(m)),
}
}
pub async fn local_api_request<T>(
&self,
req: T,
) -> Result<<T as RequestHandler>::Response, Error>
where
T: RequestHandler,
MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
AdminApiRequest: From<MultiRequest<T>>,
<MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = MultiRequest {
node: hex::encode(self.rpc_host),
body: req,
};
let resp = self.api_request(req).await?;
if let Some((_, e)) = resp.error.into_iter().next() {
return Err(Error::Message(e));
}
if resp.success.len() != 1 {
return Err(Error::Message(format!(
"{} responses returned, expected 1",
resp.success.len()
)));
}
Ok(resp.success.into_iter().next().unwrap().1)
}
}
|