aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/Cargo.toml3
-rw-r--r--src/api/admin/api.rs312
-rw-r--r--src/api/admin/api_server.rs197
-rw-r--r--src/api/admin/block.rs274
-rw-r--r--src/api/admin/bucket.rs109
-rw-r--r--src/api/admin/cluster.rs67
-rw-r--r--src/api/admin/error.rs14
-rw-r--r--src/api/admin/key.rs54
-rw-r--r--src/api/admin/lib.rs18
-rw-r--r--src/api/admin/macros.rs133
-rw-r--r--src/api/admin/node.rs216
-rw-r--r--src/api/admin/repair.rs (renamed from src/garage/repair/online.rs)176
-rw-r--r--src/api/admin/router_v2.rs17
-rw-r--r--src/api/admin/special.rs126
-rw-r--r--src/api/admin/worker.rs118
-rw-r--r--src/api/common/router_macros.rs3
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin/block.rs235
-rw-r--r--src/garage/admin/bucket.rs53
-rw-r--r--src/garage/admin/mod.rs545
-rw-r--r--src/garage/cli/cmd.rs60
-rw-r--r--src/garage/cli/layout.rs15
-rw-r--r--src/garage/cli/mod.rs11
-rw-r--r--src/garage/cli/repair.rs (renamed from src/garage/repair/offline.rs)0
-rw-r--r--src/garage/cli/structs.rs64
-rw-r--r--src/garage/cli/util.rs216
-rw-r--r--src/garage/cli_v2/block.rs145
-rw-r--r--src/garage/cli_v2/bucket.rs46
-rw-r--r--src/garage/cli_v2/mod.rs102
-rw-r--r--src/garage/cli_v2/node.rs113
-rw-r--r--src/garage/cli_v2/worker.rs213
-rw-r--r--src/garage/main.rs17
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/server.rs5
-rw-r--r--src/util/background/mod.rs5
-rw-r--r--src/util/background/worker.rs14
36 files changed, 2171 insertions, 1529 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml
index 94a321a6..9ac099e8 100644
--- a/src/api/admin/Cargo.toml
+++ b/src/api/admin/Cargo.toml
@@ -14,7 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+format_table.workspace = true
garage_model.workspace = true
+garage_block.workspace = true
garage_table.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
@@ -22,6 +24,7 @@ garage_api_common.workspace = true
argon2.workspace = true
async-trait.workspace = true
+bytesize.workspace = true
err-derive.workspace = true
hex.workspace = true
paste.workspace = true
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index 99832564..97cde158 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -1,18 +1,22 @@
+use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
-use async_trait::async_trait;
use paste::paste;
use serde::{Deserialize, Serialize};
+use garage_rpc::*;
+
use garage_model::garage::Garage;
+use garage_api_common::common_error::CommonErrorDerivative;
use garage_api_common::helpers::is_default;
+use crate::api_server::{AdminRpc, AdminRpcResponse};
use crate::error::Error;
use crate::macros::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
// This generates the following:
//
@@ -62,6 +66,7 @@ admin_endpoints![
CreateBucket,
UpdateBucket,
DeleteBucket,
+ CleanupIncompleteUploads,
// Operations on permissions for keys on buckets
AllowBucketKey,
@@ -70,8 +75,55 @@ admin_endpoints![
// Operations on bucket aliases
AddBucketAlias,
RemoveBucketAlias,
+
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ GetClusterStatistics,
+ LaunchRepairOperation,
+
+ // Worker operations
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
];
+local_admin_endpoints![
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ LaunchRepairOperation,
+ // Background workers
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
+];
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiRequest<RB> {
+ pub node: String,
+ pub body: RB,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiResponse<RB> {
+ pub success: HashMap<String, RB>,
+ pub error: HashMap<String, String>,
+}
+
// **********************************************
// Special endpoints
//
@@ -497,6 +549,19 @@ pub struct DeleteBucketRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketResponse;
+// ---- CleanupIncompleteUploads ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsRequest {
+ pub bucket_id: String,
+ pub older_than_secs: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsResponse {
+ pub uploads_deleted: u64,
+}
+
// **********************************************
// Operations on permissions for keys on buckets
// **********************************************
@@ -566,3 +631,246 @@ pub struct RemoveBucketAliasRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
+
+// **********************************************
+// Node operations
+// **********************************************
+
+// ---- CreateMetadataSnapshot ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalCreateMetadataSnapshotRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalCreateMetadataSnapshotResponse;
+
+// ---- GetNodeStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalGetNodeStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetNodeStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- GetClusterStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct GetClusterStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- LaunchRepairOperation ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationRequest {
+ pub repair_type: RepairType,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum RepairType {
+ Tables,
+ Blocks,
+ Versions,
+ MultipartUploads,
+ BlockRefs,
+ BlockRc,
+ Rebalance,
+ Scrub(ScrubCommand),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum ScrubCommand {
+ Start,
+ Pause,
+ Resume,
+ Cancel,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationResponse;
+
+// **********************************************
+// Worker operations
+// **********************************************
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalListWorkersRequest {
+ #[serde(default)]
+ pub busy_only: bool,
+ #[serde(default)]
+ pub error_only: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerInfoResp {
+ pub id: u64,
+ pub name: String,
+ pub state: WorkerStateResp,
+ pub errors: u64,
+ pub consecutive_errors: u64,
+ pub last_error: Option<WorkerLastError>,
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum WorkerStateResp {
+ Busy,
+ Throttled { duration_secs: f32 },
+ Idle,
+ Done,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerLastError {
+ pub message: String,
+ pub secs_ago: u64,
+}
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoRequest {
+ pub id: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
+
+// ---- GetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableRequest {
+ pub variable: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
+
+// ---- SetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableRequest {
+ pub variable: String,
+ pub value: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableResponse {
+ pub variable: String,
+ pub value: String,
+}
+
+// **********************************************
+// Block operations
+// **********************************************
+
+// ---- ListBlockErrors ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListBlockErrorsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockError {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try_secs_ago: u64,
+ pub next_try_in_secs: u64,
+}
+
+// ---- GetBlockInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoRequest {
+ pub block_hash: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoResponse {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub versions: Vec<BlockVersion>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockVersion {
+ pub version_id: String,
+ pub deleted: bool,
+ pub garbage_collected: bool,
+ pub backlink: Option<BlockVersionBacklink>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum BlockVersionBacklink {
+ Object {
+ bucket_id: String,
+ key: String,
+ },
+ Upload {
+ upload_id: String,
+ upload_deleted: bool,
+ upload_garbage_collected: bool,
+ bucket_id: Option<String>,
+ key: Option<String>,
+ },
+}
+
+// ---- RetryBlockResync ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum LocalRetryBlockResyncRequest {
+ #[serde(rename_all = "camelCase")]
+ All { all: bool },
+ #[serde(rename_all = "camelCase")]
+ Blocks { block_hashes: Vec<String> },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalRetryBlockResyncResponse {
+ pub count: u64,
+}
+
+// ---- PurgeBlocks ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksRequest(pub Vec<String>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksResponse {
+ pub blocks_purged: u64,
+ pub objects_deleted: u64,
+ pub uploads_deleted: u64,
+ pub versions_deleted: u64,
+}
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index be29e617..1ab81be3 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -5,17 +5,18 @@ use argon2::password_hash::PasswordHash;
use async_trait::async_trait;
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
-use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")]
use opentelemetry_prometheus::PrometheusExporter;
-#[cfg(feature = "metrics")]
-use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
+use garage_rpc::{Endpoint as RpcEndpoint, *};
+use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
@@ -27,19 +28,84 @@ use crate::error::*;
use crate::router_v0;
use crate::router_v1;
use crate::Authorization;
-use crate::EndpointHandler;
+use crate::RequestHandler;
+
+// ---- FOR RPC ----
+
+pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpc {
+ Proxy(AdminApiRequest),
+ Internal(LocalAdminApiRequest),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpcResponse {
+ ProxyApiOkResponse(TaggedAdminApiResponse),
+ InternalApiOkResponse(LocalAdminApiResponse),
+ ApiErrorResponse {
+ http_code: u16,
+ error_code: String,
+ message: String,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpcResponse, GarageError>;
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminApiServer {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpcResponse, GarageError> {
+ match message {
+ AdminRpc::Proxy(req) => {
+ info!("Proxied admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ AdminRpc::Internal(req) => {
+ info!("Internal admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ }
+ }
+}
+
+// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
#[cfg(feature = "metrics")]
- exporter: PrometheusExporter,
+ pub(crate) exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
+ pub(crate) background: Arc<BackgroundRunner>,
+ pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
}
-pub enum Endpoint {
+pub enum HttpEndpoint {
Old(router_v1::Endpoint),
New(String),
}
@@ -47,91 +113,48 @@ pub enum Endpoint {
impl AdminApiServer {
pub fn new(
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
- ) -> Self {
+ ) -> Arc<Self> {
let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
- Self {
+
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
garage,
#[cfg(feature = "metrics")]
exporter,
metrics_token,
admin_token,
- }
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
pub async fn run(
- self,
+ self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
- ApiServer::new(region, self)
+ ApiServer::new(region, ArcAdminApiServer(self))
.run_server(bind_addr, Some(0o220), must_exit)
.await
}
- fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
- #[cfg(feature = "metrics")]
- {
- use opentelemetry::trace::Tracer;
-
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
-
- let tracer = opentelemetry::global::tracer("garage");
- let metric_families = tracer.in_span("admin/gather_metrics", |_| {
- self.exporter.registry().gather()
- });
-
- encoder
- .encode(&metric_families, &mut buffer)
- .ok_or_internal_error("Could not serialize metrics")?;
-
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(http::header::CONTENT_TYPE, encoder.format_type())
- .body(bytes_body(buffer.into()))?)
- }
- #[cfg(not(feature = "metrics"))]
- Err(Error::bad_request(
- "Garage was built without the metrics feature".to_string(),
- ))
- }
-}
-
-#[async_trait]
-impl ApiHandler for AdminApiServer {
- const API_NAME: &'static str = "admin";
- const API_NAME_DISPLAY: &'static str = "Admin";
-
- type Endpoint = Endpoint;
- type Error = Error;
-
- fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
- if req.uri().path().starts_with("/v0/") {
- let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
- let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
- Ok(Endpoint::Old(endpoint_v1))
- } else if req.uri().path().starts_with("/v1/") {
- let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
- Ok(Endpoint::Old(endpoint_v1))
- } else {
- Ok(Endpoint::New(req.uri().path().to_string()))
- }
- }
-
- async fn handle(
+ async fn handle_http_api(
&self,
req: Request<IncomingBody>,
- endpoint: Endpoint,
+ endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
let auth_header = req.headers().get(AUTHORIZATION).cloned();
let request = match endpoint {
- Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
- Endpoint::New(_) => AdminApiRequest::from_request(req).await?,
+ HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
+ HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
};
let required_auth_hash =
@@ -156,12 +179,12 @@ impl ApiHandler for AdminApiServer {
}
match request {
- AdminApiRequest::Options(req) => req.handle(&self.garage).await,
- AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await,
- AdminApiRequest::Health(req) => req.handle(&self.garage).await,
- AdminApiRequest::Metrics(_req) => self.handle_metrics(),
+ AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await,
req => {
- let res = req.handle(&self.garage).await?;
+ let res = req.handle(&self.garage, &self).await?;
let mut res = json_ok_response(&res)?;
res.headers_mut()
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
@@ -171,7 +194,39 @@ impl ApiHandler for AdminApiServer {
}
}
-impl ApiEndpoint for Endpoint {
+struct ArcAdminApiServer(Arc<AdminApiServer>);
+
+#[async_trait]
+impl ApiHandler for ArcAdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = HttpEndpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
+ if req.uri().path().starts_with("/v0/") {
+ let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
+ let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
+ } else if req.uri().path().starts_with("/v1/") {
+ let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
+ } else {
+ Ok(HttpEndpoint::New(req.uri().path().to_string()))
+ }
+ }
+
+ async fn handle(
+ &self,
+ req: Request<IncomingBody>,
+ endpoint: HttpEndpoint,
+ ) -> Result<Response<ResBody>, Error> {
+ self.0.handle_http_api(req, endpoint).await
+ }
+}
+
+impl ApiEndpoint for HttpEndpoint {
fn name(&self) -> Cow<'static, str> {
match self {
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs
new file mode 100644
index 00000000..73d186a6
--- /dev/null
+++ b/src/api/admin/block.rs
@@ -0,0 +1,274 @@
+use std::sync::Arc;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::now_msec;
+
+use garage_table::EmptyKey;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use garage_api_common::common_error::CommonErrorDerivative;
+
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListBlockErrorsRequest {
+ type Response = LocalListBlockErrorsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalListBlockErrorsResponse, Error> {
+ let errors = garage.block_manager.list_resync_errors()?;
+ let now = now_msec();
+ let errors = errors
+ .into_iter()
+ .map(|e| BlockError {
+ block_hash: hex::encode(&e.hash),
+ refcount: e.refcount,
+ error_count: e.error_count,
+ last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
+ next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
+ })
+ .collect();
+ Ok(LocalListBlockErrorsResponse(errors))
+ }
+}
+
+impl RequestHandler for LocalGetBlockInfoRequest {
+ type Response = LocalGetBlockInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetBlockInfoResponse, Error> {
+ let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
+ let refcount = garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ let bl = match &v.backlink {
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: u.deleted.get(),
+ upload_garbage_collected: false,
+ bucket_id: Some(hex::encode(&u.bucket_id)),
+ key: Some(u.key.to_string()),
+ }
+ } else {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: true,
+ upload_garbage_collected: true,
+ bucket_id: None,
+ key: None,
+ }
+ }
+ }
+ VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
+ bucket_id: hex::encode(&bucket_id),
+ key: key.to_string(),
+ },
+ };
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: v.deleted.get(),
+ garbage_collected: false,
+ backlink: Some(bl),
+ });
+ } else {
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: true,
+ garbage_collected: true,
+ backlink: None,
+ });
+ }
+ }
+ Ok(LocalGetBlockInfoResponse {
+ block_hash: hex::encode(&hash),
+ refcount,
+ versions,
+ })
+ }
+}
+
+impl RequestHandler for LocalRetryBlockResyncRequest {
+ type Response = LocalRetryBlockResyncResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalRetryBlockResyncResponse, Error> {
+ match self {
+ Self::All { all: true } => {
+ let blocks = garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: blocks.len() as u64,
+ })
+ }
+ Self::All { all: false } => Err(Error::bad_request("nonsense")),
+ Self::Blocks { block_hashes } => {
+ for hash in block_hashes.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: block_hashes.len() as u64,
+ })
+ }
+ }
+ }
+}
+
+impl RequestHandler for LocalPurgeBlocksRequest {
+ type Response = LocalPurgeBlocksResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalPurgeBlocksResponse, Error> {
+ let mut obj_dels = 0;
+ let mut mpu_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in self.0.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ handle_block_purge_version_backlink(
+ garage,
+ &version,
+ &mut obj_dels,
+ &mut mpu_dels,
+ )
+ .await?;
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(version.uuid, version.backlink, true);
+ garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ }
+
+ Ok(LocalPurgeBlocksResponse {
+ blocks_purged: self.0.len() as u64,
+ versions_deleted: ver_dels,
+ objects_deleted: obj_dels,
+ uploads_deleted: mpu_dels,
+ })
+ }
+}
+
+fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
+ if prefix.len() < 4 {
+ return Err(Error::bad_request(
+ "Please specify at least 4 characters of the block hash",
+ ));
+ }
+
+ let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
+
+ let iter = garage
+ .block_ref_table
+ .data
+ .store
+ .range(&prefix_bin[..]..)
+ .map_err(GarageError::from)?;
+ let mut found = None;
+ for item in iter {
+ let (k, _v) = item.map_err(GarageError::from)?;
+ let hash = Hash::try_from(&k[..32]).unwrap();
+ if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
+ break;
+ }
+ if hex::encode(hash.as_slice()).starts_with(prefix) {
+ match &found {
+ Some(x) if *x == hash => (),
+ Some(_) => {
+ return Err(Error::bad_request(format!(
+ "Several blocks match prefix `{}`",
+ prefix
+ )));
+ }
+ None => {
+ found = Some(hash);
+ }
+ }
+ }
+ }
+
+ found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
+}
+
+async fn handle_block_purge_version_backlink(
+ garage: &Arc<Garage>,
+ version: &Version,
+ obj_dels: &mut u64,
+ mpu_dels: &mut u64,
+) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 123956ca..d2bb62e0 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-
-use async_trait::async_trait;
+use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
@@ -20,13 +19,16 @@ use garage_api_common::common_error::CommonError;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for ListBucketsRequest {
+impl RequestHandler for ListBucketsRequest {
type Response = ListBucketsResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ListBucketsResponse, Error> {
let buckets = garage
.bucket_table
.get_range(
@@ -69,11 +71,14 @@ impl EndpointHandler for ListBucketsRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetBucketInfoRequest {
+impl RequestHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
@@ -221,11 +226,14 @@ async fn bucket_info_results(
Ok(res)
}
-#[async_trait]
-impl EndpointHandler for CreateBucketRequest {
+impl RequestHandler for CreateBucketRequest {
type Response = CreateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateBucketResponse, Error> {
let helper = garage.locked_helper().await;
if let Some(ga) = &self.global_alias {
@@ -292,11 +300,14 @@ impl EndpointHandler for CreateBucketRequest {
}
}
-#[async_trait]
-impl EndpointHandler for DeleteBucketRequest {
+impl RequestHandler for DeleteBucketRequest {
type Response = DeleteBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteBucketResponse, Error> {
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&self.id)?;
@@ -341,11 +352,14 @@ impl EndpointHandler for DeleteBucketRequest {
}
}
-#[async_trait]
-impl EndpointHandler for UpdateBucketRequest {
+impl RequestHandler for UpdateBucketRequest {
type Response = UpdateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateBucketResponse, Error> {
let bucket_id = parse_bucket_id(&self.id)?;
let mut bucket = garage
@@ -388,23 +402,52 @@ impl EndpointHandler for UpdateBucketRequest {
}
}
+impl RequestHandler for CleanupIncompleteUploadsRequest {
+ type Response = CleanupIncompleteUploadsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CleanupIncompleteUploadsResponse, Error> {
+ let duration = Duration::from_secs(self.older_than_secs);
+
+ let bucket_id = parse_bucket_id(&self.bucket_id)?;
+
+ let count = garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket_id, duration)
+ .await?;
+
+ Ok(CleanupIncompleteUploadsResponse {
+ uploads_deleted: count as u64,
+ })
+ }
+}
+
// ---- BUCKET/KEY PERMISSIONS ----
-#[async_trait]
-impl EndpointHandler for AllowBucketKeyRequest {
+impl RequestHandler for AllowBucketKeyRequest {
type Response = AllowBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AllowBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
Ok(AllowBucketKeyResponse(res))
}
}
-#[async_trait]
-impl EndpointHandler for DenyBucketKeyRequest {
+impl RequestHandler for DenyBucketKeyRequest {
type Response = DenyBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DenyBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
Ok(DenyBucketKeyResponse(res))
}
@@ -449,11 +492,14 @@ pub async fn handle_bucket_change_key_perm(
// ---- BUCKET ALIASES ----
-#[async_trait]
-impl EndpointHandler for AddBucketAliasRequest {
+impl RequestHandler for AddBucketAliasRequest {
type Response = AddBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AddBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
@@ -480,11 +526,14 @@ impl EndpointHandler for AddBucketAliasRequest {
}
}
-#[async_trait]
-impl EndpointHandler for RemoveBucketAliasRequest {
+impl RequestHandler for RemoveBucketAliasRequest {
type Response = RemoveBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RemoveBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index dc16bd50..cb1fa493 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
-
use garage_util::crdt::*;
use garage_util::data::*;
@@ -12,13 +10,16 @@ use garage_model::garage::Garage;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for GetClusterStatusRequest {
+impl RequestHandler for GetClusterStatusRequest {
type Response = GetClusterStatusResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatusResponse, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@@ -116,11 +117,14 @@ impl EndpointHandler for GetClusterStatusRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetClusterHealthRequest {
+impl RequestHandler for GetClusterHealthRequest {
type Response = GetClusterHealthResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterHealthResponse, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = GetClusterHealthResponse {
@@ -142,11 +146,14 @@ impl EndpointHandler for GetClusterHealthRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ConnectClusterNodesRequest {
+impl RequestHandler for ConnectClusterNodesRequest {
type Response = ConnectClusterNodesResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ConnectClusterNodesResponse, Error> {
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
.await
.into_iter()
@@ -165,11 +172,14 @@ impl EndpointHandler for ConnectClusterNodesRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetClusterLayoutRequest {
+impl RequestHandler for GetClusterLayoutRequest {
type Response = GetClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterLayoutResponse, Error> {
Ok(format_cluster_layout(
garage.system.cluster_layout().inner(),
))
@@ -225,11 +235,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
// ---- update functions ----
-#[async_trait]
-impl EndpointHandler for UpdateClusterLayoutRequest {
+impl RequestHandler for UpdateClusterLayoutRequest {
type Response = UpdateClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateClusterLayoutResponse, Error> {
let mut layout = garage.system.cluster_layout().inner().clone();
let mut roles = layout.current().roles.clone();
@@ -271,11 +284,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ApplyClusterLayoutRequest {
+impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
@@ -292,11 +308,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest {
}
}
-#[async_trait]
-impl EndpointHandler for RevertClusterLayoutRequest {
+impl RequestHandler for RevertClusterLayoutRequest {
type Response = RevertClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RevertClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let layout = layout.revert_staged_changes()?;
garage
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 3712ee7d..d7ea7dc9 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -25,6 +25,14 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested block does not exist
+ #[error(display = "Block not found: {}", _0)]
+ NoSuchBlock(String),
+
+ /// The requested worker does not exist
+ #[error(display = "Worker not found: {}", _0)]
+ NoSuchWorker(u64),
+
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@@ -53,6 +61,8 @@ impl Error {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::NoSuchWorker(_) => "NoSuchWorker",
+ Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -63,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
+ StatusCode::NOT_FOUND
+ }
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 5b7de075..dc6ae4e9 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
-
use garage_table::*;
use garage_model::garage::Garage;
@@ -10,13 +8,12 @@ use garage_model::key_table::*;
use crate::api::*;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for ListKeysRequest {
+impl RequestHandler for ListKeysRequest {
type Response = ListKeysResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
let res = garage
.key_table
.get_range(
@@ -38,11 +35,14 @@ impl EndpointHandler for ListKeysRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetKeyInfoRequest {
+impl RequestHandler for GetKeyInfoRequest {
type Response = GetKeyInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetKeyInfoResponse, Error> {
let key = match (self.id, self.search) {
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
(None, Some(search)) => {
@@ -62,11 +62,14 @@ impl EndpointHandler for GetKeyInfoRequest {
}
}
-#[async_trait]
-impl EndpointHandler for CreateKeyRequest {
+impl RequestHandler for CreateKeyRequest {
type Response = CreateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateKeyResponse, Error> {
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@@ -76,11 +79,14 @@ impl EndpointHandler for CreateKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ImportKeyRequest {
+impl RequestHandler for ImportKeyRequest {
type Response = ImportKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ImportKeyResponse, Error> {
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
@@ -100,11 +106,14 @@ impl EndpointHandler for ImportKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for UpdateKeyRequest {
+impl RequestHandler for UpdateKeyRequest {
type Response = UpdateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateKeyResponse, Error> {
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
let key_state = key.state.as_option_mut().unwrap();
@@ -131,11 +140,14 @@ impl EndpointHandler for UpdateKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for DeleteKeyRequest {
+impl RequestHandler for DeleteKeyRequest {
type Response = DeleteKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteKeyResponse, Error> {
let helper = garage.locked_helper().await;
let mut key = helper.key().get_existing_key(&self.id).await?;
diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs
index 31b3874d..dd9b7ffd 100644
--- a/src/api/admin/lib.rs
+++ b/src/api/admin/lib.rs
@@ -15,21 +15,29 @@ mod cluster;
mod key;
mod special;
-use std::sync::Arc;
+mod block;
+mod node;
+mod repair;
+mod worker;
-use async_trait::async_trait;
+use std::sync::Arc;
use garage_model::garage::Garage;
+pub use api_server::AdminApiServer as Admin;
+
pub enum Authorization {
None,
MetricsToken,
AdminToken,
}
-#[async_trait]
-pub trait EndpointHandler {
+pub trait RequestHandler {
type Response;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>;
+ fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send;
}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
index 9521616e..df2762fe 100644
--- a/src/api/admin/macros.rs
+++ b/src/api/admin/macros.rs
@@ -70,11 +70,10 @@ macro_rules! admin_endpoints {
}
)*
- #[async_trait]
- impl EndpointHandler for AdminApiRequest {
+ impl RequestHandler for AdminApiRequest {
type Response = AdminApiResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
Ok(match self {
$(
AdminApiRequest::$special_endpoint(_) => panic!(
@@ -82,7 +81,132 @@ macro_rules! admin_endpoints {
),
)*
$(
- AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?),
+ AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
+ )*
+ })
+ }
+ }
+ }
+ };
+}
+
+macro_rules! local_admin_endpoints {
+ [
+ $($endpoint:ident,)*
+ ] => {
+ paste! {
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiRequest {
+ $(
+ $endpoint( [<Local $endpoint Request>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiResponse {
+ $(
+ $endpoint( [<Local $endpoint Response>] ),
+ )*
+ }
+
+ $(
+ pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
+
+ pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
+
+ pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
+
+ impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
+ fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
+ LocalAdminApiRequest::$endpoint(req)
+ }
+ }
+
+ impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
+ type Error = LocalAdminApiResponse;
+ fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
+ match resp {
+ LocalAdminApiResponse::$endpoint(v) => Ok(v),
+ x => Err(x),
+ }
+ }
+ }
+
+ impl RequestHandler for [< $endpoint Request >] {
+ type Response = [< $endpoint Response >];
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
+ let to = match self.node.as_str() {
+ "*" => garage.system.cluster_layout().all_nodes().to_vec(),
+ id => {
+ let nodes = garage.system.cluster_layout().all_nodes()
+ .iter()
+ .filter(|x| hex::encode(x).starts_with(id))
+ .cloned()
+ .collect::<Vec<_>>();
+ if nodes.len() != 1 {
+ return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
+ }
+ nodes
+ }
+ };
+
+ let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
+ &to,
+ AdminRpc::Internal(self.body.into()),
+ RequestStrategy::with_priority(PRIO_NORMAL),
+ ).await?;
+
+ let mut ret = [< $endpoint Response >] {
+ success: HashMap::new(),
+ error: HashMap::new(),
+ };
+ for (node, resp) in resps {
+ match resp {
+ Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
+ match [< Local $endpoint Response >]::try_from(r) {
+ Ok(r) => {
+ ret.success.insert(hex::encode(node), r);
+ }
+ Err(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ }
+ }
+ Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
+ ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
+ }
+ Ok(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ Err(e) => {
+ ret.error.insert(hex::encode(node), e.to_string());
+ }
+ }
+ }
+
+ Ok(ret)
+ }
+ }
+ )*
+
+ impl LocalAdminApiRequest {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(
+ Self::$endpoint(_) => stringify!($endpoint),
+ )*
+ }
+ }
+ }
+
+ impl RequestHandler for LocalAdminApiRequest {
+ type Response = LocalAdminApiResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
+ Ok(match self {
+ $(
+ LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)*
})
}
@@ -92,3 +216,4 @@ macro_rules! admin_endpoints {
}
pub(crate) use admin_endpoints;
+pub(crate) use local_admin_endpoints;
diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs
new file mode 100644
index 00000000..f6f43d95
--- /dev/null
+++ b/src/api/admin/node.rs
@@ -0,0 +1,216 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use format_table::format_table_to_string;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::layout::PARTITION_BITS;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalCreateMetadataSnapshotRequest {
+ type Response = LocalCreateMetadataSnapshotResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalCreateMetadataSnapshotResponse, Error> {
+ garage_model::snapshot::async_snapshot_metadata(garage).await?;
+ Ok(LocalCreateMetadataSnapshotResponse)
+ }
+}
+
+impl RequestHandler for LocalGetNodeStatisticsRequest {
+ type Response = LocalGetNodeStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetNodeStatisticsResponse, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "Garage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(gather_table_stats(&garage.bucket_table)?);
+ table.push(gather_table_stats(&garage.key_table)?);
+ table.push(gather_table_stats(&garage.object_table)?);
+ table.push(gather_table_stats(&garage.version_table)?);
+ table.push(gather_table_stats(&garage.block_ref_table)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = garage.block_manager.rc_len()?.to_string();
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ Ok(LocalGetNodeStatisticsResponse { freeform: ret })
+ }
+}
+
+impl RequestHandler for GetClusterStatisticsRequest {
+ type Response = GetClusterStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatisticsResponse, Error> {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics for current nodes
+ let layout = &garage.system.cluster_layout();
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.current().ring_assignment_data.iter() {
+ let id = layout.current().node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ Ok(GetClusterStatisticsResponse { freeform: ret })
+ }
+}
+
+fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
+ let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+}
diff --git a/src/garage/repair/online.rs b/src/api/admin/repair.rs
index 2c5227d2..113ef636 100644
--- a/src/garage/repair/online.rs
+++ b/src/api/admin/repair.rs
@@ -4,6 +4,14 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::{Error as GarageError, OkOrMessage};
+use garage_util::migrate::Migrate;
+
+use garage_table::replication::*;
+use garage_table::*;
+
use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
@@ -13,97 +21,90 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_util::background::*;
-use garage_util::data::*;
-use garage_util::error::Error;
-use garage_util::migrate::Migrate;
-
-use crate::*;
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
const RC_REPAIR_ITER_COUNT: usize = 64;
-pub async fn launch_online_repair(
- garage: &Arc<Garage>,
- bg: &BackgroundRunner,
- opt: RepairOpt,
-) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync()?;
- garage.object_table.syncer.add_full_sync()?;
- garage.version_table.syncer.add_full_sync()?;
- garage.block_ref_table.syncer.add_full_sync()?;
- garage.key_table.syncer.add_full_sync()?;
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
- }
- RepairWhat::MultipartUploads => {
- info!("Repairing the multipart uploads table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
- }
- RepairWhat::BlockRc => {
- info!("Repairing the block reference counters");
- bg.spawn_worker(BlockRcRepair::new(
- garage.block_manager.clone(),
- garage.block_ref_table.clone(),
- ));
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- bg.spawn_worker(garage_block::repair::RepairWorker::new(
- garage.block_manager.clone(),
- ));
- }
- RepairWhat::Scrub { cmd } => {
- let cmd = match cmd {
- ScrubCmd::Start => ScrubWorkerCommand::Start,
- ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
- ScrubCmd::Resume => ScrubWorkerCommand::Resume,
- ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
- ScrubCmd::SetTranquility { tranquility } => {
- garage
- .block_manager
- .scrub_persister
- .set_with(|x| x.tranquility = tranquility)?;
- return Ok(());
- }
- };
- info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await?;
- }
- RepairWhat::Rebalance => {
- info!("Rebalancing the stored blocks among storage locations");
- bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
- garage.block_manager.clone(),
- ));
+impl RequestHandler for LocalLaunchRepairOperationRequest {
+ type Response = LocalLaunchRepairOperationResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalLaunchRepairOperationResponse, Error> {
+ let bg = &admin.background;
+ match self.repair_type {
+ RepairType::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
+ }
+ RepairType::Versions => {
+ info!("Repairing the versions table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
+ }
+ RepairType::MultipartUploads => {
+ info!("Repairing the multipart uploads table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
+ }
+ RepairType::BlockRefs => {
+ info!("Repairing the block refs table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
+ }
+ RepairType::BlockRc => {
+ info!("Repairing the block reference counters");
+ bg.spawn_worker(BlockRcRepair::new(
+ garage.block_manager.clone(),
+ garage.block_ref_table.clone(),
+ ));
+ }
+ RepairType::Blocks => {
+ info!("Repairing the stored blocks");
+ bg.spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairType::Scrub(cmd) => {
+ let cmd = match cmd {
+ ScrubCommand::Start => ScrubWorkerCommand::Start,
+ ScrubCommand::Pause => {
+ ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24))
+ }
+ ScrubCommand::Resume => ScrubWorkerCommand::Resume,
+ ScrubCommand::Cancel => ScrubWorkerCommand::Cancel,
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await?;
+ }
+ RepairType::Rebalance => {
+ info!("Rebalancing the stored blocks among storage locations");
+ bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
}
+ Ok(LocalLaunchRepairOperationResponse)
}
- Ok(())
}
// ----
-#[async_trait]
trait TableRepair: Send + Sync + 'static {
type T: TableSchema;
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
- async fn process(
+ fn process(
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
- ) -> Result<bool, Error>;
+ ) -> impl std::future::Future<Output = Result<bool, GarageError>> + Send;
}
struct TableRepairWorker<T: TableRepair> {
@@ -139,7 +140,10 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
}
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
@@ -174,7 +178,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
struct RepairVersions;
-#[async_trait]
impl TableRepair for RepairVersions {
type T = VersionTable;
@@ -182,7 +185,7 @@ impl TableRepair for RepairVersions {
&garage.version_table
}
- async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
+ async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> {
if !version.deleted.get() {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
@@ -221,7 +224,6 @@ impl TableRepair for RepairVersions {
struct RepairBlockRefs;
-#[async_trait]
impl TableRepair for RepairBlockRefs {
type T = BlockRefTable;
@@ -229,7 +231,11 @@ impl TableRepair for RepairBlockRefs {
&garage.block_ref_table
}
- async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut block_ref: BlockRef,
+ ) -> Result<bool, GarageError> {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
@@ -257,7 +263,6 @@ impl TableRepair for RepairBlockRefs {
struct RepairMpu;
-#[async_trait]
impl TableRepair for RepairMpu {
type T = MultipartUploadTable;
@@ -265,7 +270,11 @@ impl TableRepair for RepairMpu {
&garage.mpu_table
}
- async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut mpu: MultipartUpload,
+ ) -> Result<bool, GarageError> {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
@@ -332,7 +341,10 @@ impl Worker for BlockRcRepair {
}
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
for _i in 0..RC_REPAIR_ITER_COUNT {
let next1 = self
.block_manager
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index b36bca34..4d5c015e 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -52,12 +52,28 @@ impl AdminApiRequest {
POST CreateBucket (body),
POST DeleteBucket (query::id),
POST UpdateBucket (body_field, query::id),
+ POST CleanupIncompleteUploads (body),
// Bucket-key permissions
POST AllowBucketKey (body),
POST DenyBucketKey (body),
// Bucket aliases
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
+ // Node APIs
+ POST CreateMetadataSnapshot (default::body, query::node),
+ GET GetNodeStatistics (default::body, query::node),
+ GET GetClusterStatistics (),
+ POST LaunchRepairOperation (body_field, query::node),
+ // Worker APIs
+ POST ListWorkers (body_field, query::node),
+ POST GetWorkerInfo (body_field, query::node),
+ POST GetWorkerVariable (body_field, query::node),
+ POST SetWorkerVariable (body_field, query::node),
+ // Block APIs
+ GET ListBlockErrors (default::body, query::node),
+ POST GetBlockInfo (body_field, query::node),
+ POST RetryBlockResync (body_field, query::node),
+ POST PurgeBlocks (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
@@ -239,6 +255,7 @@ impl AdminApiRequest {
generateQueryParameters! {
keywords: [],
fields: [
+ "node" => node,
"domain" => domain,
"format" => format,
"id" => id,
diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs
index 0b26fe32..0ecf82bc 100644
--- a/src/api/admin/special.rs
+++ b/src/api/admin/special.rs
@@ -1,27 +1,31 @@
use std::sync::Arc;
-use async_trait::async_trait;
-
use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW,
};
use hyper::{Response, StatusCode};
+#[cfg(feature = "metrics")]
+use prometheus::{Encoder, TextEncoder};
+
use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus;
use garage_api_common::helpers::*;
-use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
+use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest};
use crate::api_server::ResBody;
use crate::error::*;
-use crate::EndpointHandler;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for OptionsRequest {
+impl RequestHandler for OptionsRequest {
type Response = Response<ResBody>;
- async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(ALLOW, "OPTIONS,GET,POST")
@@ -32,11 +36,83 @@ impl EndpointHandler for OptionsRequest {
}
}
-#[async_trait]
-impl EndpointHandler for CheckDomainRequest {
+impl RequestHandler for MetricsRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ #[cfg(feature = "metrics")]
+ {
+ use opentelemetry::trace::Tracer;
+
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ admin.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, encoder.format_type())
+ .body(bytes_body(buffer.into()))?)
+ }
+ #[cfg(not(feature = "metrics"))]
+ Err(Error::bad_request(
+ "Garage was built without the metrics feature".to_string(),
+ ))
+ }
+}
+
+impl RequestHandler for HealthRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ let health = garage.system.health();
+
+ let (status, status_str) = match health.status {
+ ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
+ ClusterHealthStatus::Degraded => (
+ StatusCode::OK,
+ "Garage is operational but some storage nodes are unavailable",
+ ),
+ ClusterHealthStatus::Unavailable => (
+ StatusCode::SERVICE_UNAVAILABLE,
+ "Quorum is not available for some/all partitions, reads and writes will fail",
+ ),
+ };
+ let status_str = format!(
+ "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
+ status_str
+ );
+
+ Ok(Response::builder()
+ .status(status)
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(string_body(status_str))?)
+ }
+}
+
+impl RequestHandler for CheckDomainRequest {
type Response = Response<ResBody>;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
if check_domain(garage, &self.domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
@@ -101,33 +177,3 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
None => Ok(false),
}
}
-
-#[async_trait]
-impl EndpointHandler for HealthRequest {
- type Response = Response<ResBody>;
-
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let health = garage.system.health();
-
- let (status, status_str) = match health.status {
- ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
- ClusterHealthStatus::Degraded => (
- StatusCode::OK,
- "Garage is operational but some storage nodes are unavailable",
- ),
- ClusterHealthStatus::Unavailable => (
- StatusCode::SERVICE_UNAVAILABLE,
- "Quorum is not available for some/all partitions, reads and writes will fail",
- ),
- };
- let status_str = format!(
- "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
- status_str
- );
-
- Ok(Response::builder()
- .status(status)
- .header(http::header::CONTENT_TYPE, "text/plain")
- .body(string_body(status_str))?)
- }
-}
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
new file mode 100644
index 00000000..b3f4537b
--- /dev/null
+++ b/src/api/admin/worker.rs
@@ -0,0 +1,118 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use garage_util::background::*;
+use garage_util::time::now_msec;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListWorkersRequest {
+ type Response = LocalListWorkersResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalListWorkersResponse, Error> {
+ let workers = admin.background.get_worker_info();
+ let info = workers
+ .into_iter()
+ .filter(|(_, w)| {
+ (!self.busy_only
+ || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
+ && (!self.error_only || w.errors > 0)
+ })
+ .map(|(id, w)| worker_info_to_api(id as u64, w))
+ .collect::<Vec<_>>();
+ Ok(LocalListWorkersResponse(info))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerInfoRequest {
+ type Response = LocalGetWorkerInfoResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalGetWorkerInfoResponse, Error> {
+ let info = admin
+ .background
+ .get_worker_info()
+ .get(&(self.id as usize))
+ .ok_or(Error::NoSuchWorker(self.id))?
+ .clone();
+ Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
+ self.id, info,
+ )))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerVariableRequest {
+ type Response = LocalGetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetWorkerVariableResponse, Error> {
+ let mut res = HashMap::new();
+ if let Some(k) = self.variable {
+ res.insert(k.clone(), garage.bg_vars.get(&k)?);
+ } else {
+ let vars = garage.bg_vars.get_all();
+ for (k, v) in vars.iter() {
+ res.insert(k.to_string(), v.to_string());
+ }
+ }
+ Ok(LocalGetWorkerVariableResponse(res))
+ }
+}
+
+impl RequestHandler for LocalSetWorkerVariableRequest {
+ type Response = LocalSetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalSetWorkerVariableResponse, Error> {
+ garage.bg_vars.set(&self.variable, &self.value)?;
+
+ Ok(LocalSetWorkerVariableResponse {
+ variable: self.variable,
+ value: self.value,
+ })
+ }
+}
+
+// ---- helper functions ----
+
+fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
+ WorkerInfoResp {
+ id,
+ name: info.name,
+ state: match info.state {
+ WorkerState::Busy => WorkerStateResp::Busy,
+ WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
+ WorkerState::Idle => WorkerStateResp::Idle,
+ WorkerState::Done => WorkerStateResp::Done,
+ },
+ errors: info.errors as u64,
+ consecutive_errors: info.consecutive_errors as u64,
+ last_error: info.last_error.map(|(message, t)| WorkerLastError {
+ message,
+ secs_ago: now_msec().saturating_sub(t) / 1000,
+ }),
+
+ tranquility: info.status.tranquility,
+ progress: info.status.progress,
+ queue_length: info.status.queue_length,
+ persistent_errors: info.status.persistent_errors,
+ freeform: info.status.freeform,
+ }
+}
diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs
index 299420f7..f4a93c67 100644
--- a/src/api/common/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -141,6 +141,9 @@ macro_rules! router_match {
}
}};
+ (@@parse_param $query:expr, default, $param:ident) => {{
+ Default::default()
+ }};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 4f823fc6..c566c3e0 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -49,8 +49,6 @@ sodiumoxide.workspace = true
structopt.workspace = true
git-version.workspace = true
-serde.workspace = true
-
futures.workspace = true
tokio.workspace = true
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
deleted file mode 100644
index edeb88c0..00000000
--- a/src/garage/admin/block.rs
+++ /dev/null
@@ -1,235 +0,0 @@
-use garage_util::data::*;
-
-use garage_table::*;
-
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::*;
-
-use crate::cli::*;
-
-use super::*;
-
-impl AdminRpcHandler {
- pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
- self.garage.block_manager.list_resync_errors()?,
- )),
- BlockOperation::Info { hash } => self.handle_block_info(hash).await,
- BlockOperation::RetryNow { all, blocks } => {
- self.handle_block_retry_now(*all, blocks).await
- }
- BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
- }
- }
-
- async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
- let hash = self.find_block_hash_by_prefix(hash)?;
- let refcount = self.garage.block_manager.get_block_rc(&hash)?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
- let mut versions = vec![];
- let mut uploads = vec![];
- for br in block_refs {
- if let Some(v) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
- if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
- uploads.push(u);
- }
- }
- versions.push(Ok(v));
- } else {
- versions.push(Err(br.version));
- }
- }
- Ok(AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- uploads,
- })
- }
-
- async fn handle_block_retry_now(
- &self,
- all: bool,
- blocks: &[String],
- ) -> Result<AdminRpc, Error> {
- if all {
- if !blocks.is_empty() {
- return Err(Error::BadRequest(
- "--all was specified, cannot also specify blocks".into(),
- ));
- }
- let blocks = self.garage.block_manager.list_resync_errors()?;
- for b in blocks.iter() {
- self.garage.block_manager.resync.clear_backoff(&b.hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- } else {
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- self.garage.block_manager.resync.clear_backoff(&hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- }
- }
-
- async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
- if !yes {
- return Err(Error::BadRequest(
- "Pass the --yes flag to confirm block purge operation.".into(),
- ));
- }
-
- let mut obj_dels = 0;
- let mut mpu_dels = 0;
- let mut ver_dels = 0;
-
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
-
- for br in block_refs {
- if let Some(version) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- self.handle_block_purge_version_backlink(
- &version,
- &mut obj_dels,
- &mut mpu_dels,
- )
- .await?;
-
- if !version.deleted.get() {
- let deleted_version = Version::new(version.uuid, version.backlink, true);
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
- }
-
- Ok(AdminRpc::Ok(format!(
- "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
- blocks.len(),
- ver_dels,
- obj_dels,
- mpu_dels,
- )))
- }
-
- async fn handle_block_purge_version_backlink(
- &self,
- version: &Version,
- obj_dels: &mut usize,
- mpu_dels: &mut usize,
- ) -> Result<(), Error> {
- let (bucket_id, key, ov_id) = match &version.backlink {
- VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
- VersionBacklink::MultipartUpload { upload_id } => {
- if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
- if !mpu.deleted.get() {
- mpu.parts.clear();
- mpu.deleted.set();
- self.garage.mpu_table.insert(&mpu).await?;
- *mpu_dels += 1;
- }
- (mpu.bucket_id, mpu.key.clone(), *upload_id)
- } else {
- return Ok(());
- }
- }
- };
-
- if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == ov_id {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- bucket_id,
- key,
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- *obj_dels += 1;
- }
- }
- }
-
- Ok(())
- }
-
- // ---- helper function ----
- fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> {
- if prefix.len() < 4 {
- return Err(Error::BadRequest(
- "Please specify at least 4 characters of the block hash".into(),
- ));
- }
-
- let prefix_bin =
- hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
-
- let iter = self
- .garage
- .block_ref_table
- .data
- .store
- .range(&prefix_bin[..]..)
- .map_err(GarageError::from)?;
- let mut found = None;
- for item in iter {
- let (k, _v) = item.map_err(GarageError::from)?;
- let hash = Hash::try_from(&k[..32]).unwrap();
- if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
- break;
- }
- if hex::encode(hash.as_slice()).starts_with(prefix) {
- match &found {
- Some(x) if *x == hash => (),
- Some(_) => {
- return Err(Error::BadRequest(format!(
- "Several blocks match prefix `{}`",
- prefix
- )));
- }
- None => {
- found = Some(hash);
- }
- }
- }
- }
-
- found.ok_or_else(|| Error::BadRequest("No matching block found".into()))
- }
-}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
deleted file mode 100644
index 26d54084..00000000
--- a/src/garage/admin/bucket.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use std::fmt::Write;
-
-use garage_model::helper::error::{Error, OkOrBadRequest};
-
-use crate::cli::*;
-
-use super::*;
-
-impl AdminRpcHandler {
- pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BucketOperation::CleanupIncompleteUploads(query) => {
- self.handle_bucket_cleanup_incomplete_uploads(query).await
- }
- _ => unreachable!(),
- }
- }
-
- async fn handle_bucket_cleanup_incomplete_uploads(
- &self,
- query: &CleanupIncompleteUploadsOpt,
- ) -> Result<AdminRpc, Error> {
- let mut bucket_ids = vec![];
- for b in query.buckets.iter() {
- bucket_ids.push(
- self.garage
- .bucket_helper()
- .admin_get_existing_matching_bucket(b)
- .await?,
- );
- }
-
- let duration = parse_duration::parse::parse(&query.older_than)
- .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
-
- let mut ret = String::new();
- for bucket in bucket_ids {
- let count = self
- .garage
- .bucket_helper()
- .cleanup_incomplete_uploads(&bucket, duration)
- .await?;
- writeln!(
- &mut ret,
- "Bucket {:?}: {} incomplete uploads aborted",
- bucket, count
- )
- .unwrap();
- }
-
- Ok(AdminRpc::Ok(ret))
- }
-}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
deleted file mode 100644
index 70f8ec67..00000000
--- a/src/garage/admin/mod.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-mod block;
-mod bucket;
-
-use std::collections::HashMap;
-use std::fmt::Write;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
-use format_table::format_table_to_string;
-
-use garage_util::background::BackgroundRunner;
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_rpc::layout::PARTITION_BITS;
-use garage_rpc::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::garage::Garage;
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::s3::mpu_table::MultipartUpload;
-use garage_model::s3::version_table::Version;
-
-use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
-use garage_api_admin::EndpointHandler as AdminApiEndpoint;
-use garage_api_common::generic_server::ApiError;
-
-use crate::cli::*;
-use crate::repair::online::launch_online_repair;
-
-pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
-
-#[derive(Debug, Serialize, Deserialize)]
-#[allow(clippy::large_enum_variant)]
-pub enum AdminRpc {
- BucketOperation(BucketOperation),
- LaunchRepair(RepairOpt),
- Stats(StatsOpt),
- Worker(WorkerOperation),
- BlockOperation(BlockOperation),
- MetaOperation(MetaOperation),
-
- // Replies
- Ok(String),
- WorkerList(
- HashMap<usize, garage_util::background::WorkerInfo>,
- WorkerListOpt,
- ),
- WorkerVars(Vec<(Uuid, String, String)>),
- WorkerInfo(usize, garage_util::background::WorkerInfo),
- BlockErrorList(Vec<BlockResyncErrorInfo>),
- BlockInfo {
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- uploads: Vec<MultipartUpload>,
- },
-
- // Proxying HTTP Admin API endpoints
- ApiRequest(AdminApiRequest),
- ApiOkResponse(TaggedAdminApiResponse),
- ApiErrorResponse {
- http_code: u16,
- error_code: String,
- message: String,
- },
-}
-
-impl Rpc for AdminRpc {
- type Response = Result<AdminRpc, Error>;
-}
-
-pub struct AdminRpcHandler {
- garage: Arc<Garage>,
- background: Arc<BackgroundRunner>,
- endpoint: Arc<Endpoint<AdminRpc, Self>>,
-}
-
-impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
- let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self {
- garage,
- background,
- endpoint,
- });
- admin.endpoint.set_handler(admin.clone());
- admin
- }
-
- // ================ REPAIR COMMANDS ====================
-
- async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate repair operations.".to_string(),
- ));
- }
- if opt.all_nodes {
- let mut opt_to_send = opt.clone();
- opt_to_send.all_nodes = false;
-
- let mut failures = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- let resp = self
- .endpoint
- .call(
- &node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- PRIO_NORMAL,
- )
- .await;
- if !matches!(resp, Ok(Ok(_))) {
- failures.push(node);
- }
- }
- if failures.is_empty() {
- Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
- } else {
- Err(Error::BadRequest(format!(
- "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
- failures
- )))
- }
- } else {
- launch_online_repair(&self.garage, &self.background, opt).await?;
- Ok(AdminRpc::Ok(format!(
- "Repair launched on {:?}",
- self.garage.system.id
- )))
- }
- }
-
- // ================ STATS COMMANDS ====================
-
- async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
- if opt.all_nodes {
- let mut ret = String::new();
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
-
- for node in all_nodes.iter() {
- let mut opt = opt.clone();
- opt.all_nodes = false;
- opt.skip_global = true;
-
- writeln!(&mut ret, "\n======================").unwrap();
- writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
-
- let node_id = (*node).into();
- match self
- .endpoint
- .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
- {
- Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
- Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
- }
- }
-
- writeln!(&mut ret, "\n======================").unwrap();
- write!(
- &mut ret,
- "Cluster statistics:\n\n{}",
- self.gather_cluster_stats()
- )
- .unwrap();
-
- Ok(AdminRpc::Ok(ret))
- } else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
- }
- }
-
- fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
- let mut ret = String::new();
- writeln!(
- &mut ret,
- "\nGarage version: {} [features: {}]\nRust compiler version: {}",
- garage_util::version::garage_version(),
- garage_util::version::garage_features()
- .map(|list| list.join(", "))
- .unwrap_or_else(|| "(unknown)".into()),
- garage_util::version::rust_version(),
- )
- .unwrap();
-
- writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
-
- // Gather table statistics
- let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
- table.push(self.gather_table_stats(&self.garage.bucket_table)?);
- table.push(self.gather_table_stats(&self.garage.key_table)?);
- table.push(self.gather_table_stats(&self.garage.object_table)?);
- table.push(self.gather_table_stats(&self.garage.version_table)?);
- table.push(self.gather_table_stats(&self.garage.block_ref_table)?);
- write!(
- &mut ret,
- "\nTable stats:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- // Gather block manager statistics
- writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- let rc_len = self.garage.block_manager.rc_len()?.to_string();
-
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- rc_len
- )
- .unwrap();
- writeln!(
- &mut ret,
- " resync queue length: {}",
- self.garage.block_manager.resync.queue_len()?
- )
- .unwrap();
- writeln!(
- &mut ret,
- " blocks with resync errors: {}",
- self.garage.block_manager.resync.errors_len()?
- )
- .unwrap();
-
- if !opt.skip_global {
- write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
- }
-
- Ok(ret)
- }
-
- fn gather_cluster_stats(&self) -> String {
- let mut ret = String::new();
-
- // Gather storage node and free space statistics for current nodes
- let layout = &self.garage.system.cluster_layout();
- let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.current().ring_assignment_data.iter() {
- let id = layout.current().node_id_vec[*short_id as usize];
- *node_partition_count.entry(id).or_default() += 1;
- }
- let node_info = self
- .garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|n| (n.id, n))
- .collect::<HashMap<_, _>>();
-
- let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
- for (id, parts) in node_partition_count.iter() {
- let info = node_info.get(id);
- let status = info.map(|x| &x.status);
- let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
- let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
- let capacity = role
- .map(|x| x.capacity_string())
- .unwrap_or_else(|| "?".into());
- let avail_str = |x| match x {
- Some((avail, total)) => {
- let pct = (avail as f64) / (total as f64) * 100.;
- let avail = bytesize::ByteSize::b(avail);
- let total = bytesize::ByteSize::b(total);
- format!("{}/{} ({:.1}%)", avail, total, pct)
- }
- None => "?".into(),
- };
- let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
- let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
- table.push(format!(
- " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
- id, hostname, zone, capacity, parts, data_avail, meta_avail
- ));
- }
- write!(
- &mut ret,
- "Storage nodes:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- let meta_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.meta_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- let data_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.data_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
- let meta_avail =
- bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- let data_avail =
- bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- writeln!(
- &mut ret,
- "\nEstimated available storage space cluster-wide (might be lower in practice):"
- )
- .unwrap();
- if meta_part_avail.len() < node_partition_count.len()
- || data_part_avail.len() < node_partition_count.len()
- {
- writeln!(&mut ret, " data: < {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
- writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
- } else {
- writeln!(&mut ret, " data: {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
- }
- }
-
- ret
- }
-
- fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error>
- where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
- {
- let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
- let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
-
- Ok(format!(
- " {}\t{}\t{}\t{}\t{}",
- F::TABLE_NAME,
- data_len,
- mkl_len,
- t.merkle_updater.todo_len()?,
- t.data.gc_todo_len()?
- ))
- }
-
- // ================ WORKER COMMANDS ====================
-
- async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
- match cmd {
- WorkerOperation::List { opt } => {
- let workers = self.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, *opt))
- }
- WorkerOperation::Info { tid } => {
- let info = self
- .background
- .get_worker_info()
- .get(tid)
- .ok_or_bad_request(format!("No worker with TID {}", tid))?
- .clone();
- Ok(AdminRpc::WorkerInfo(*tid, info))
- }
- WorkerOperation::Get {
- all_nodes,
- variable,
- } => self.handle_get_var(*all_nodes, variable).await,
- WorkerOperation::Set {
- all_nodes,
- variable,
- value,
- } => self.handle_set_var(*all_nodes, variable, value).await,
- }
- }
-
- async fn handle_get_var(
- &self,
- all_nodes: bool,
- variable: &Option<String>,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Get {
- all_nodes: false,
- variable: variable.clone(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- #[allow(clippy::collapsible_else_if)]
- if let Some(v) = variable {
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- v.clone(),
- self.garage.bg_vars.get(v)?,
- )]))
- } else {
- let mut vars = self.garage.bg_vars.get_all();
- vars.sort();
- Ok(AdminRpc::WorkerVars(
- vars.into_iter()
- .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
- .collect(),
- ))
- }
- }
- }
-
- async fn handle_set_var(
- &self,
- all_nodes: bool,
- variable: &str,
- value: &str,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Set {
- all_nodes: false,
- variable: variable.to_string(),
- value: value.to_string(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- self.garage.bg_vars.set(variable, value)?;
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- variable.to_string(),
- value.to_string(),
- )]))
- }
- }
-
- // ================ META DB COMMANDS ====================
-
- async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
- match mo {
- MetaOperation::Snapshot { all: true } => {
- let to = self.garage.system.cluster_layout().all_nodes().to_vec();
-
- let resps = futures::future::join_all(to.iter().map(|to| async move {
- let to = (*to).into();
- self.endpoint
- .call(
- &to,
- AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
- PRIO_NORMAL,
- )
- .await
- }))
- .await;
-
- let mut ret = vec![];
- for (to, resp) in to.iter().zip(resps.iter()) {
- let res_str = match resp {
- Ok(_) => "ok".to_string(),
- Err(e) => format!("error: {}", e),
- };
- ret.push(format!("{:?}\t{}", to, res_str));
- }
-
- Ok(AdminRpc::Ok(format_table_to_string(ret)))
- }
- MetaOperation::Snapshot { all: false } => {
- garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
- Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
- }
- }
- }
-
- // ================== PROXYING ADMIN API REQUESTS ===================
-
- async fn handle_api_request(
- self: &Arc<Self>,
- req: &AdminApiRequest,
- ) -> Result<AdminRpc, Error> {
- let req = req.clone();
- info!("Proxied admin API request: {}", req.name());
- let res = req.handle(&self.garage).await;
- match res {
- Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
- Err(e) => Ok(AdminRpc::ApiErrorResponse {
- http_code: e.http_status_code().as_u16(),
- error_code: e.code().to_string(),
- message: e.to_string(),
- }),
- }
- }
-}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
- self: &Arc<Self>,
- message: &AdminRpc,
- _from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
-}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
deleted file mode 100644
index a6540c65..00000000
--- a/src/garage/cli/cmd.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-use garage_util::error::*;
-
-use garage_rpc::system::*;
-use garage_rpc::*;
-
-use garage_model::helper::error::Error as HelperError;
-
-use crate::admin::*;
-use crate::cli::*;
-
-pub async fn cmd_admin(
- rpc_cli: &Endpoint<AdminRpc, ()>,
- rpc_host: NodeID,
- args: AdminRpc,
-) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
- AdminRpc::Ok(msg) => {
- println!("{}", msg);
- }
- AdminRpc::WorkerList(wi, wlo) => {
- print_worker_list(wi, wlo);
- }
- AdminRpc::WorkerVars(wv) => {
- print_worker_vars(wv);
- }
- AdminRpc::WorkerInfo(tid, wi) => {
- print_worker_info(tid, wi);
- }
- AdminRpc::BlockErrorList(el) => {
- print_block_error_list(el);
- }
- AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- uploads,
- } => {
- print_block_info(hash, refcount, versions, uploads);
- }
- r => {
- error!("Unexpected response: {:?}", r);
- }
- }
- Ok(())
-}
-
-// ---- utility ----
-
-pub async fn fetch_status(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
-) -> Result<Vec<KnownNodeInfo>, Error> {
- match rpc_cli
- .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
- resp => Err(Error::unexpected_rpc_message(resp)),
- }
-}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index bb81d144..bb77cc2a 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -7,7 +7,7 @@ use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use crate::cli::*;
+use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
@@ -260,6 +260,19 @@ pub async fn cmd_layout_skip_dead_nodes(
// --- utility ---
+pub async fn fetch_status(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<Vec<KnownNodeInfo>, Error> {
+ match rpc_cli
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
+ resp => Err(Error::unexpected_rpc_message(resp)),
+ }
+}
+
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs
index 30f566e2..e007808b 100644
--- a/src/garage/cli/mod.rs
+++ b/src/garage/cli/mod.rs
@@ -1,12 +1,7 @@
-pub(crate) mod cmd;
-pub(crate) mod init;
-pub(crate) mod layout;
pub(crate) mod structs;
-pub(crate) mod util;
pub(crate) mod convert_db;
+pub(crate) mod init;
+pub(crate) mod repair;
-pub(crate) use cmd::*;
-pub(crate) use init::*;
-pub(crate) use structs::*;
-pub(crate) use util::*;
+pub(crate) mod layout;
diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs
index 45024e71..45024e71 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/cli/repair.rs
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 4ec35e68..c6471515 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::version::garage_version;
@@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt {
pub(crate) allow_missing_data: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list", version = garage_version())]
@@ -237,7 +236,7 @@ pub enum BucketOperation {
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@@ -259,13 +258,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@@ -275,7 +274,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@@ -288,7 +287,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@@ -321,7 +320,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
@@ -336,7 +335,7 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
@@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt {
pub buckets: Vec<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
@@ -382,7 +381,7 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -391,14 +390,14 @@ pub struct KeyInfoOpt {
pub show_secret: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(default_value = "Unnamed key")]
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -407,7 +406,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -417,7 +416,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -427,7 +426,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
@@ -444,7 +443,7 @@ pub struct KeyImportOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
@@ -458,7 +457,7 @@ pub struct RepairOpt {
pub what: RepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
@@ -489,7 +488,7 @@ pub enum RepairWhat {
Rebalance,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start", version = garage_version())]
@@ -503,15 +502,9 @@ pub enum ScrubCmd {
/// Cancel scrub in progress
#[structopt(name = "cancel", version = garage_version())]
Cancel,
- /// Set tranquility level for in-progress and future scrubs
- #[structopt(name = "set-tranquility", version = garage_version())]
- SetTranquility {
- #[structopt()]
- tranquility: u32,
- },
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
@@ -521,7 +514,7 @@ pub struct OfflineRepairOpt {
pub what: OfflineRepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
@@ -532,19 +525,14 @@ pub enum OfflineRepairWhat {
ObjectCounters,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
-
- /// Don't show global cluster stats (internal use in RPC)
- #[structopt(skip)]
- #[serde(default)]
- pub skip_global: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
@@ -577,7 +565,7 @@ pub enum WorkerOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
@@ -587,7 +575,7 @@ pub struct WorkerListOpt {
pub errors: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
@@ -619,7 +607,7 @@ pub enum BlockOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
deleted file mode 100644
index a3a1480e..00000000
--- a/src/garage/cli/util.rs
+++ /dev/null
@@ -1,216 +0,0 @@
-use std::collections::HashMap;
-use std::time::Duration;
-
-use format_table::format_table;
-use garage_util::background::*;
-use garage_util::data::*;
-use garage_util::time::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::s3::mpu_table::MultipartUpload;
-use garage_model::s3::version_table::*;
-
-use crate::cli::structs::WorkerListOpt;
-
-pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
- let mut wi = wi.into_iter().collect::<Vec<_>>();
- wi.sort_by_key(|(tid, info)| {
- (
- match info.state {
- WorkerState::Busy | WorkerState::Throttled(_) => 0,
- WorkerState::Idle => 1,
- WorkerState::Done => 2,
- },
- *tid,
- )
- });
-
- let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
- for (tid, info) in wi.iter() {
- if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
- continue;
- }
- if wlo.errors && info.errors == 0 {
- continue;
- }
-
- let tf = timeago::Formatter::new();
- let err_ago = info
- .last_error
- .as_ref()
- .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
- .unwrap_or_default();
- let (total_err, consec_err) = if info.errors > 0 {
- (info.errors.to_string(), info.consecutive_errors.to_string())
- } else {
- ("-".into(), "-".into())
- };
-
- table.push(format!(
- "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
- tid,
- info.state,
- info.name,
- info.status
- .tranquility
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- info.status.progress.as_deref().unwrap_or("-"),
- info.status
- .queue_length
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- total_err,
- consec_err,
- err_ago,
- ));
- }
- format_table(table);
-}
-
-pub fn print_worker_info(tid: usize, info: WorkerInfo) {
- let mut table = vec![];
- table.push(format!("Task id:\t{}", tid));
- table.push(format!("Worker name:\t{}", info.name));
- match info.state {
- WorkerState::Throttled(t) => {
- table.push(format!(
- "Worker state:\tBusy (throttled, paused for {:.3}s)",
- t
- ));
- }
- s => {
- table.push(format!("Worker state:\t{}", s));
- }
- };
- if let Some(tql) = info.status.tranquility {
- table.push(format!("Tranquility:\t{}", tql));
- }
-
- table.push("".into());
- table.push(format!("Total errors:\t{}", info.errors));
- table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
- if let Some((s, t)) = info.last_error {
- table.push(format!("Last error:\t{}", s));
- let tf = timeago::Formatter::new();
- table.push(format!(
- "Last error time:\t{}",
- tf.convert(Duration::from_millis(now_msec() - t))
- ));
- }
-
- table.push("".into());
- if let Some(p) = info.status.progress {
- table.push(format!("Progress:\t{}", p));
- }
- if let Some(ql) = info.status.queue_length {
- table.push(format!("Queue length:\t{}", ql));
- }
- if let Some(pe) = info.status.persistent_errors {
- table.push(format!("Persistent errors:\t{}", pe));
- }
-
- for (i, s) in info.status.freeform.iter().enumerate() {
- if i == 0 {
- if table.last() != Some(&"".into()) {
- table.push("".into());
- }
- table.push(format!("Message:\t{}", s));
- } else {
- table.push(format!("\t{}", s));
- }
- }
- format_table(table);
-}
-
-pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
- let table = wv
- .into_iter()
- .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
- .collect::<Vec<_>>();
- format_table(table);
-}
-
-pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
- let now = now_msec();
- let tf = timeago::Formatter::new();
- let mut tf2 = timeago::Formatter::new();
- tf2.ago("");
-
- let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
- for e in el {
- let next_try = if e.next_try > now {
- tf2.convert(Duration::from_millis(e.next_try - now))
- } else {
- "asap".to_string()
- };
- table.push(format!(
- "{}\t{}\t{}\t{}\tin {}",
- hex::encode(e.hash.as_slice()),
- e.refcount,
- e.error_count,
- tf.convert(Duration::from_millis(now - e.last_try)),
- next_try
- ));
- }
- format_table(table);
-}
-
-pub fn print_block_info(
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- uploads: Vec<MultipartUpload>,
-) {
- println!("Block hash: {}", hex::encode(hash.as_slice()));
- println!("Refcount: {}", refcount);
- println!();
-
- let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
- let mut nondeleted_count = 0;
- for v in versions.iter() {
- match v {
- Ok(ver) => {
- match &ver.backlink {
- VersionBacklink::Object { bucket_id, key } => {
- table.push(format!(
- "{:?}\t{:?}\t{}\t\t{:?}",
- ver.uuid,
- bucket_id,
- key,
- ver.deleted.get()
- ));
- }
- VersionBacklink::MultipartUpload { upload_id } => {
- let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
- table.push(format!(
- "{:?}\t{:?}\t{}\t{:?}\t{:?}",
- ver.uuid,
- upload.map(|u| u.bucket_id).unwrap_or_default(),
- upload.map(|u| u.key.as_str()).unwrap_or_default(),
- upload_id,
- ver.deleted.get()
- ));
- }
- }
- if !ver.deleted.get() {
- nondeleted_count += 1;
- }
- }
- Err(vh) => {
- table.push(format!("{:?}\t\t\t\tyes", vh));
- }
- }
- }
- format_table(table);
-
- if refcount != nondeleted_count {
- println!();
- println!(
- "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
- );
- }
-}
diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs
new file mode 100644
index 00000000..bfc0db4a
--- /dev/null
+++ b/src/garage/cli_v2/block.rs
@@ -0,0 +1,145 @@
+//use bytesize::ByteSize;
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {
+ match cmd {
+ BlockOperation::ListErrors => self.cmd_list_block_errors().await,
+ BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
+ BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
+ }
+ }
+
+ pub async fn cmd_list_block_errors(&self) -> Result<(), Error> {
+ let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0;
+
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in errors {
+ let next_try = if e.next_try_in_secs > 0 {
+ tf2.convert(Duration::from_secs(e.next_try_in_secs))
+ } else {
+ "asap".to_string()
+ };
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ e.block_hash,
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_secs(e.last_try_secs_ago)),
+ next_try
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetBlockInfoRequest { block_hash: hash })
+ .await?;
+
+ println!("Block hash: {}", info.block_hash);
+ println!("Refcount: {}", info.refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for ver in info.versions.iter() {
+ match &ver.backlink {
+ Some(BlockVersionBacklink::Object { bucket_id, key }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t\t{:?}",
+ ver.version_id, bucket_id, key, ver.deleted
+ ));
+ }
+ Some(BlockVersionBacklink::Upload {
+ upload_id,
+ upload_deleted: _,
+ upload_garbage_collected: _,
+ bucket_id,
+ key,
+ }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}",
+ ver.version_id,
+ bucket_id.as_deref().unwrap_or(""),
+ key.as_deref().unwrap_or(""),
+ upload_id,
+ ver.deleted
+ ));
+ }
+ None => {
+ table.push(format!("{:.16}\t\t\tyes", ver.version_id));
+ }
+ }
+ if !ver.deleted {
+ nondeleted_count += 1;
+ }
+ }
+ format_table(table);
+
+ if info.refcount != nondeleted_count {
+ println!();
+ println!(
+ "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
+ );
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
+ let req = match (all, blocks.len()) {
+ (true, 0) => LocalRetryBlockResyncRequest::All { all: true },
+ (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
+ block_hashes: blocks,
+ },
+ _ => {
+ return Err(Error::Message(
+ "Please specify block hashes or --all (not both)".into(),
+ ))
+ }
+ };
+
+ let res = self.local_api_request(req).await?;
+
+ println!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ res.count
+ );
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
+ if !yes {
+ return Err(Error::Message(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let res = self
+ .local_api_request(LocalPurgeBlocksRequest(blocks))
+ .await?;
+
+ println!(
+ "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
+ res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
+ );
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs
index ee3b6800..c25c2c3e 100644
--- a/src/garage/cli_v2/bucket.rs
+++ b/src/garage/cli_v2/bucket.rs
@@ -5,7 +5,6 @@ use garage_util::error::*;
use garage_api_admin::api::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@@ -22,15 +21,9 @@ impl Cli {
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
-
- // TODO
- x => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BucketOperation(x),
- )
- .await
- .ok_or_message("old error"),
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.cmd_cleanup_incomplete_uploads(query).await
+ }
}
}
@@ -520,4 +513,37 @@ impl Cli {
Ok(())
}
+
+ pub async fn cmd_cleanup_incomplete_uploads(
+ &self,
+ opt: CleanupIncompleteUploadsOpt,
+ ) -> Result<(), Error> {
+ let older_than = parse_duration::parse::parse(&opt.older_than)
+ .ok_or_message("Invalid duration passed for --older-than parameter")?;
+
+ for b in opt.buckets.iter() {
+ let bucket = self
+ .api_request(GetBucketInfoRequest {
+ id: None,
+ global_alias: None,
+ search: Some(b.clone()),
+ })
+ .await?;
+
+ let res = self
+ .api_request(CleanupIncompleteUploadsRequest {
+ bucket_id: bucket.id.clone(),
+ older_than_secs: older_than.as_secs(),
+ })
+ .await?;
+
+ if res.uploads_deleted > 0 {
+ println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted);
+ } else {
+ println!("{:.16}: no uploads deleted", bucket.id);
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs
index 6cc13b2d..28c7c824 100644
--- a/src/garage/cli_v2/mod.rs
+++ b/src/garage/cli_v2/mod.rs
@@ -3,6 +3,10 @@ pub mod cluster;
pub mod key;
pub mod layout;
+pub mod block;
+pub mod node;
+pub mod worker;
+
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@@ -13,16 +17,14 @@ use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
-use garage_api_admin::EndpointHandler as AdminApiEndpoint;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
+use garage_api_admin::RequestHandler;
-use crate::admin::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
-use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
- pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
+ pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@@ -36,63 +38,35 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
-
- // TODO
- Command::Repair(ro) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::LaunchRepair(ro),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Stats(so) => {
- cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
- .await
- .ok_or_message("cli_v1")
- }
- Command::Worker(wo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::Worker(wo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Block(bo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BlockOperation(bo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Meta(mo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::MetaOperation(mo),
- )
- .await
- .ok_or_message("cli_v1"),
+ Command::Worker(wo) => self.cmd_worker(wo).await,
+ Command::Block(bo) => self.cmd_block(bo).await,
+ Command::Meta(mo) => self.cmd_meta(mo).await,
+ Command::Stats(so) => self.cmd_stats(so).await,
+ Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}
}
- pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
+ pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
where
- T: AdminApiEndpoint,
+ T: RequestHandler,
AdminApiRequest: From<T>,
- <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
+ <T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
- .admin_rpc_endpoint
- .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
- .await?
- .ok_or_message("rpc")?
+ .proxy_rpc_endpoint
+ .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
+ .await??
{
- AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
- .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
- AdminRpc::ApiErrorResponse {
+ ProxyRpcResponse::ProxyApiOkResponse(resp) => {
+ <T as RequestHandler>::Response::try_from(resp).map_err(|_| {
+ Error::Message(format!("{} returned unexpected response", req_name))
+ })
+ }
+ ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
@@ -103,4 +77,32 @@ impl Cli {
m => Err(Error::unexpected_rpc_message(m)),
}
}
+
+ pub async fn local_api_request<T>(
+ &self,
+ req: T,
+ ) -> Result<<T as RequestHandler>::Response, Error>
+ where
+ T: RequestHandler,
+ MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
+ AdminApiRequest: From<MultiRequest<T>>,
+ <MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
+ {
+ let req = MultiRequest {
+ node: hex::encode(self.rpc_host),
+ body: req,
+ };
+ let resp = self.api_request(req).await?;
+
+ if let Some((_, e)) = resp.error.into_iter().next() {
+ return Err(Error::Message(e));
+ }
+ if resp.success.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} responses returned, expected 1",
+ resp.success.len()
+ )));
+ }
+ Ok(resp.success.into_iter().next().unwrap().1)
+ }
}
diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs
new file mode 100644
index 00000000..c5d0cdea
--- /dev/null
+++ b/src/garage/cli_v2/node.rs
@@ -0,0 +1,113 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> {
+ let MetaOperation::Snapshot { all } = cmd;
+
+ let res = self
+ .api_request(CreateMetadataSnapshotRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalCreateMetadataSnapshotRequest,
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tSnapshot created", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> {
+ let res = self
+ .api_request(GetNodeStatisticsRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetNodeStatisticsRequest,
+ })
+ .await?;
+
+ for (node, res) in res.success.iter() {
+ println!("======================");
+ println!("Stats for node {:.16}:\n", node);
+ println!("{}\n", res.freeform);
+ }
+
+ for (node, err) in res.error.iter() {
+ println!("======================");
+ println!("Node {:.16}: error: {}\n", node, err);
+ }
+
+ let res = self.api_request(GetClusterStatisticsRequest).await?;
+ println!("======================");
+ println!("Cluster statistics:\n");
+ println!("{}\n", res.freeform);
+
+ Ok(())
+ }
+
+ pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
+ if !cmd.yes {
+ return Err(Error::Message(
+ "Please add --yes to start the repair operation".into(),
+ ));
+ }
+
+ let repair_type = match cmd.what {
+ RepairWhat::Tables => RepairType::Tables,
+ RepairWhat::Blocks => RepairType::Blocks,
+ RepairWhat::Versions => RepairType::Versions,
+ RepairWhat::MultipartUploads => RepairType::MultipartUploads,
+ RepairWhat::BlockRefs => RepairType::BlockRefs,
+ RepairWhat::BlockRc => RepairType::BlockRc,
+ RepairWhat::Rebalance => RepairType::Rebalance,
+ RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
+ ScrubCmd::Start => ScrubCommand::Start,
+ ScrubCmd::Cancel => ScrubCommand::Cancel,
+ ScrubCmd::Pause => ScrubCommand::Pause,
+ ScrubCmd::Resume => ScrubCommand::Resume,
+ }),
+ };
+
+ let res = self
+ .api_request(LaunchRepairOperationRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalLaunchRepairOperationRequest { repair_type },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tRepair launched", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs
new file mode 100644
index 00000000..9c248a39
--- /dev/null
+++ b/src/garage/cli_v2/worker.rs
@@ -0,0 +1,213 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
+ match cmd {
+ WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
+ WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.cmd_get_var(all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.cmd_set_var(all_nodes, variable, value).await,
+ }
+ }
+
+ pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
+ let mut list = self
+ .local_api_request(LocalListWorkersRequest {
+ busy_only: opt.busy,
+ error_only: opt.errors,
+ })
+ .await?
+ .0;
+
+ list.sort_by_key(|info| {
+ (
+ match info.state {
+ WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
+ WorkerStateResp::Idle => 1,
+ WorkerStateResp::Done => 2,
+ },
+ info.id,
+ )
+ });
+
+ let mut table =
+ vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
+ let tf = timeago::Formatter::new();
+ for info in list.iter() {
+ let err_ago = info
+ .last_error
+ .as_ref()
+ .map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ info.id,
+ format_worker_state(&info.state),
+ info.name,
+ info.tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.progress.as_deref().unwrap_or("-"),
+ info.queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 })
+ .await?
+ .0;
+
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", info.id));
+ table.push(format!("Worker name:\t{}", info.name));
+ match &info.state {
+ WorkerStateResp::Throttled { duration_secs } => {
+ table.push(format!(
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ duration_secs
+ ));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", format_worker_state(s)));
+ }
+ };
+ if let Some(tql) = info.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some(err) = info.last_error {
+ table.push(format!("Last error:\t{}", err.message));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_secs(err.secs_ago))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
+ }
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
+ }
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
+ let res = self
+ .api_request(GetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetWorkerVariableRequest { variable: var },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, vars) in res.success.iter() {
+ for (key, val) in vars.0.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, key, val));
+ }
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_set_var(
+ &self,
+ all: bool,
+ variable: String,
+ value: String,
+ ) -> Result<(), Error> {
+ let res = self
+ .api_request(SetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalSetWorkerVariableRequest { variable, value },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, kv) in res.success.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+}
+
+fn format_worker_state(s: &WorkerStateResp) -> &'static str {
+ match s {
+ WorkerStateResp::Busy => "Busy",
+ WorkerStateResp::Throttled { .. } => "Busy*",
+ WorkerStateResp::Idle => "Idle",
+ WorkerStateResp::Done => "Done",
+ }
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 08c7cee7..2a88d760 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -4,10 +4,8 @@
#[macro_use]
extern crate tracing;
-mod admin;
mod cli;
mod cli_v2;
-mod repair;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@@ -35,8 +33,9 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use admin::*;
-use cli::*;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
+
+use cli::structs::*;
use secrets::Secrets;
#[derive(StructOpt, Debug)]
@@ -144,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
- repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
+ cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
- node_id_command(opt.config_file, node_id_opt.quiet)
+ cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
_ => cli_command(opt).await,
};
@@ -251,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
- .err_context(READ_KEY_ERROR)?;
+ .err_context(cli::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@@ -281,11 +280,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
- let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
+ let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
- admin_rpc_endpoint,
+ proxy_rpc_endpoint,
rpc_host: id,
};
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
deleted file mode 100644
index 4699ace5..00000000
--- a/src/garage/repair/mod.rs
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod offline;
-pub mod online;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 9e58fa6d..131cc8aa 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -14,7 +14,6 @@ use garage_web::WebServer;
#[cfg(feature = "k2v")]
use garage_api_k2v::api_server::K2VApiServer;
-use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
@@ -66,6 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(
garage.clone(),
+ background.clone(),
#[cfg(feature = "metrics")]
metrics_exporter,
);
@@ -73,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
- info!("Create admin RPC handler...");
- AdminRpcHandler::new(garage.clone(), background.clone());
-
// ---- Launch public-facing API servers ----
let mut servers = vec![];
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 607cd7a3..cae3a462 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -6,7 +6,6 @@ pub mod worker;
use std::collections::HashMap;
use std::sync::Arc;
-use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use worker::WorkerProcessor;
@@ -18,7 +17,7 @@ pub struct BackgroundRunner {
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
-#[derive(Clone, Serialize, Deserialize, Debug)]
+#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub name: String,
pub status: WorkerStatus,
@@ -30,7 +29,7 @@ pub struct WorkerInfo {
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
-#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+#[derive(Clone, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 76fb14e8..9028a052 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -6,7 +6,6 @@ use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
-use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -18,7 +17,7 @@ use crate::time::now_msec;
// will be interrupted in the middle of whatever they are doing.
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
-#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerState {
Busy,
Throttled(f32),
@@ -26,17 +25,6 @@ pub enum WorkerState {
Done,
}
-impl std::fmt::Display for WorkerState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- WorkerState::Busy => write!(f, "Busy"),
- WorkerState::Throttled(_) => write!(f, "Busy*"),
- WorkerState::Idle => write!(f, "Idle"),
- WorkerState::Done => write!(f, "Done"),
- }
- }
-}
-
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;