1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
pub mod bucket;
pub mod cluster;
pub mod key;
pub mod layout;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_api::admin::api::*;
use garage_api::admin::EndpointHandler as AdminApiEndpoint;
use crate::admin::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub rpc_host: NodeID,
}
impl Cli {
pub async fn handle(&self, cmd: Command) -> Result<(), Error> {
match cmd {
Command::Status => self.cmd_status().await,
Command::Node(NodeOperation::Connect(connect_opt)) => {
self.cmd_connect(connect_opt).await
}
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::LaunchRepair(ro),
)
.await
.ok_or_message("cli_v1"),
Command::Stats(so) => {
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
.await
.ok_or_message("cli_v1")
}
Command::Worker(wo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::Worker(wo),
)
.await
.ok_or_message("cli_v1"),
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
Command::Meta(mo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::MetaOperation(mo),
)
.await
.ok_or_message("cli_v1"),
_ => unreachable!(),
}
}
pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
where
T: AdminApiEndpoint,
AdminApiRequest: From<T>,
<T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
.admin_rpc_endpoint
.call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
.await?
.ok_or_message("rpc")?
{
AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
.map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
AdminRpc::ApiErrorResponse {
http_code,
error_code,
message,
} => Err(Error::Message(format!(
"{} returned {} ({}): {}",
req_name, error_code, http_code, message
))),
m => Err(Error::unexpected_rpc_message(m)),
}
}
}
|