aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/admin/Cargo.toml4
-rw-r--r--src/api/admin/api.rs876
-rw-r--r--src/api/admin/api_server.rs393
-rw-r--r--src/api/admin/block.rs274
-rw-r--r--src/api/admin/bucket.rs769
-rw-r--r--src/api/admin/cluster.rs557
-rw-r--r--src/api/admin/error.rs16
-rw-r--r--src/api/admin/key.rs293
-rw-r--r--src/api/admin/lib.rs32
-rw-r--r--src/api/admin/macros.rs219
-rw-r--r--src/api/admin/node.rs216
-rw-r--r--src/api/admin/repair.rs403
-rw-r--r--src/api/admin/router_v1.rs15
-rw-r--r--src/api/admin/router_v2.rs268
-rw-r--r--src/api/admin/special.rs179
-rw-r--r--src/api/admin/worker.rs118
-rw-r--r--src/api/common/generic_server.rs3
-rw-r--r--src/api/common/router_macros.rs85
-rw-r--r--src/api/k2v/api_server.rs5
-rw-r--r--src/api/s3/api_server.rs5
20 files changed, 3581 insertions, 1149 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml
index adddf306..9ac099e8 100644
--- a/src/api/admin/Cargo.toml
+++ b/src/api/admin/Cargo.toml
@@ -14,7 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+format_table.workspace = true
garage_model.workspace = true
+garage_block.workspace = true
garage_table.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
@@ -22,8 +24,10 @@ garage_api_common.workspace = true
argon2.workspace = true
async-trait.workspace = true
+bytesize.workspace = true
err-derive.workspace = true
hex.workspace = true
+paste.workspace = true
tracing.workspace = true
futures.workspace = true
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
new file mode 100644
index 00000000..97cde158
--- /dev/null
+++ b/src/api/admin/api.rs
@@ -0,0 +1,876 @@
+use std::collections::HashMap;
+use std::convert::TryFrom;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use paste::paste;
+use serde::{Deserialize, Serialize};
+
+use garage_rpc::*;
+
+use garage_model::garage::Garage;
+
+use garage_api_common::common_error::CommonErrorDerivative;
+use garage_api_common::helpers::is_default;
+
+use crate::api_server::{AdminRpc, AdminRpcResponse};
+use crate::error::Error;
+use crate::macros::*;
+use crate::{Admin, RequestHandler};
+
+// This generates the following:
+//
+// - An enum AdminApiRequest that contains a variant for all endpoints
+//
+// - An enum AdminApiResponse that contains a variant for all non-special endpoints.
+// This enum is serialized in api_server.rs, without the enum tag,
+// which gives directly the JSON response corresponding to the API call.
+// This enum does not implement Deserialize as its meaning can be ambiguous.
+//
+// - An enum TaggedAdminApiResponse that contains the same variants, but
+// serializes as a tagged enum. This allows it to be transmitted through
+// Garage RPC and deserialized correctly upon receival.
+// Conversion from untagged to tagged can be done using the `.tagged()` method.
+//
+// - AdminApiRequest::name() that returns the name of the endpoint
+//
+// - impl EndpointHandler for AdminApiHandler, that uses the impl EndpointHandler
+// of each request type below for non-special endpoints
+admin_endpoints![
+ // Special endpoints of the Admin API
+ @special Options,
+ @special CheckDomain,
+ @special Health,
+ @special Metrics,
+
+ // Cluster operations
+ GetClusterStatus,
+ GetClusterHealth,
+ ConnectClusterNodes,
+ GetClusterLayout,
+ UpdateClusterLayout,
+ ApplyClusterLayout,
+ RevertClusterLayout,
+
+ // Access key operations
+ ListKeys,
+ GetKeyInfo,
+ CreateKey,
+ ImportKey,
+ UpdateKey,
+ DeleteKey,
+
+ // Bucket operations
+ ListBuckets,
+ GetBucketInfo,
+ CreateBucket,
+ UpdateBucket,
+ DeleteBucket,
+ CleanupIncompleteUploads,
+
+ // Operations on permissions for keys on buckets
+ AllowBucketKey,
+ DenyBucketKey,
+
+ // Operations on bucket aliases
+ AddBucketAlias,
+ RemoveBucketAlias,
+
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ GetClusterStatistics,
+ LaunchRepairOperation,
+
+ // Worker operations
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
+];
+
+local_admin_endpoints![
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ LaunchRepairOperation,
+ // Background workers
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
+];
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiRequest<RB> {
+ pub node: String,
+ pub body: RB,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiResponse<RB> {
+ pub success: HashMap<String, RB>,
+ pub error: HashMap<String, String>,
+}
+
+// **********************************************
+// Special endpoints
+//
+// These endpoints don't have associated *Response structs
+// because they directly produce an http::Response
+// **********************************************
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OptionsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CheckDomainRequest {
+ pub domain: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct HealthRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MetricsRequest;
+
+// **********************************************
+// Cluster operations
+// **********************************************
+
+// ---- GetClusterStatus ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterStatusRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetClusterStatusResponse {
+ pub node: String,
+ pub garage_version: String,
+ pub garage_features: Option<Vec<String>>,
+ pub rust_version: String,
+ pub db_engine: String,
+ pub layout_version: u64,
+ pub nodes: Vec<NodeResp>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct NodeResp {
+ pub id: String,
+ pub role: Option<NodeRoleResp>,
+ pub addr: Option<SocketAddr>,
+ pub hostname: Option<String>,
+ pub is_up: bool,
+ pub last_seen_secs_ago: Option<u64>,
+ pub draining: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub data_partition: Option<FreeSpaceResp>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub metadata_partition: Option<FreeSpaceResp>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct NodeRoleResp {
+ pub id: String,
+ pub zone: String,
+ pub capacity: Option<u64>,
+ pub tags: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FreeSpaceResp {
+ pub available: u64,
+ pub total: u64,
+}
+
+// ---- GetClusterHealth ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterHealthRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetClusterHealthResponse {
+ pub status: String,
+ pub known_nodes: usize,
+ pub connected_nodes: usize,
+ pub storage_nodes: usize,
+ pub storage_nodes_ok: usize,
+ pub partitions: usize,
+ pub partitions_quorum: usize,
+ pub partitions_all_ok: usize,
+}
+
+// ---- ConnectClusterNodes ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConnectClusterNodesRequest(pub Vec<String>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConnectClusterNodesResponse(pub Vec<ConnectNodeResponse>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ConnectNodeResponse {
+ pub success: bool,
+ pub error: Option<String>,
+}
+
+// ---- GetClusterLayout ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterLayoutRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetClusterLayoutResponse {
+ pub version: u64,
+ pub roles: Vec<NodeRoleResp>,
+ pub staged_role_changes: Vec<NodeRoleChange>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct NodeRoleChange {
+ pub id: String,
+ #[serde(flatten)]
+ pub action: NodeRoleChangeEnum,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum NodeRoleChangeEnum {
+ #[serde(rename_all = "camelCase")]
+ Remove { remove: bool },
+ #[serde(rename_all = "camelCase")]
+ Update {
+ zone: String,
+ capacity: Option<u64>,
+ tags: Vec<String>,
+ },
+}
+
+// ---- UpdateClusterLayout ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateClusterLayoutRequest(pub Vec<NodeRoleChange>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse);
+
+// ---- ApplyClusterLayout ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ApplyClusterLayoutRequest {
+ pub version: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ApplyClusterLayoutResponse {
+ pub message: Vec<String>,
+ pub layout: GetClusterLayoutResponse,
+}
+
+// ---- RevertClusterLayout ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RevertClusterLayoutRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse);
+
+// **********************************************
+// Access key operations
+// **********************************************
+
+// ---- ListKeys ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ListKeysRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ListKeysResponse(pub Vec<ListKeysResponseItem>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListKeysResponseItem {
+ pub id: String,
+ pub name: String,
+}
+
+// ---- GetKeyInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetKeyInfoRequest {
+ pub id: Option<String>,
+ pub search: Option<String>,
+ pub show_secret_key: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetKeyInfoResponse {
+ pub name: String,
+ pub access_key_id: String,
+ #[serde(skip_serializing_if = "is_default")]
+ pub secret_access_key: Option<String>,
+ pub permissions: KeyPerm,
+ pub buckets: Vec<KeyInfoBucketResponse>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct KeyPerm {
+ #[serde(default)]
+ pub create_bucket: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct KeyInfoBucketResponse {
+ pub id: String,
+ pub global_aliases: Vec<String>,
+ pub local_aliases: Vec<String>,
+ pub permissions: ApiBucketKeyPerm,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct ApiBucketKeyPerm {
+ #[serde(default)]
+ pub read: bool,
+ #[serde(default)]
+ pub write: bool,
+ #[serde(default)]
+ pub owner: bool,
+}
+
+// ---- CreateKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateKeyRequest {
+ pub name: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CreateKeyResponse(pub GetKeyInfoResponse);
+
+// ---- ImportKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ImportKeyRequest {
+ pub access_key_id: String,
+ pub secret_access_key: String,
+ pub name: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ImportKeyResponse(pub GetKeyInfoResponse);
+
+// ---- UpdateKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateKeyRequest {
+ pub id: String,
+ pub body: UpdateKeyRequestBody,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateKeyResponse(pub GetKeyInfoResponse);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateKeyRequestBody {
+ pub name: Option<String>,
+ pub allow: Option<KeyPerm>,
+ pub deny: Option<KeyPerm>,
+}
+
+// ---- DeleteKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DeleteKeyRequest {
+ pub id: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DeleteKeyResponse;
+
+// **********************************************
+// Bucket operations
+// **********************************************
+
+// ---- ListBuckets ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ListBucketsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ListBucketsResponse(pub Vec<ListBucketsResponseItem>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListBucketsResponseItem {
+ pub id: String,
+ pub global_aliases: Vec<String>,
+ pub local_aliases: Vec<BucketLocalAlias>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BucketLocalAlias {
+ pub access_key_id: String,
+ pub alias: String,
+}
+
+// ---- GetBucketInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetBucketInfoRequest {
+ pub id: Option<String>,
+ pub global_alias: Option<String>,
+ pub search: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetBucketInfoResponse {
+ pub id: String,
+ pub global_aliases: Vec<String>,
+ pub website_access: bool,
+ #[serde(default)]
+ pub website_config: Option<GetBucketInfoWebsiteResponse>,
+ pub keys: Vec<GetBucketInfoKey>,
+ pub objects: i64,
+ pub bytes: i64,
+ pub unfinished_uploads: i64,
+ pub unfinished_multipart_uploads: i64,
+ pub unfinished_multipart_upload_parts: i64,
+ pub unfinished_multipart_upload_bytes: i64,
+ pub quotas: ApiBucketQuotas,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetBucketInfoWebsiteResponse {
+ pub index_document: String,
+ pub error_document: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetBucketInfoKey {
+ pub access_key_id: String,
+ pub name: String,
+ pub permissions: ApiBucketKeyPerm,
+ pub bucket_local_aliases: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ApiBucketQuotas {
+ pub max_size: Option<u64>,
+ pub max_objects: Option<u64>,
+}
+
+// ---- CreateBucket ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBucketRequest {
+ pub global_alias: Option<String>,
+ pub local_alias: Option<CreateBucketLocalAlias>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CreateBucketResponse(pub GetBucketInfoResponse);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CreateBucketLocalAlias {
+ pub access_key_id: String,
+ pub alias: String,
+ #[serde(default)]
+ pub allow: ApiBucketKeyPerm,
+}
+
+// ---- UpdateBucket ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateBucketRequest {
+ pub id: String,
+ pub body: UpdateBucketRequestBody,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UpdateBucketResponse(pub GetBucketInfoResponse);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateBucketRequestBody {
+ pub website_access: Option<UpdateBucketWebsiteAccess>,
+ pub quotas: Option<ApiBucketQuotas>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UpdateBucketWebsiteAccess {
+ pub enabled: bool,
+ pub index_document: Option<String>,
+ pub error_document: Option<String>,
+}
+
+// ---- DeleteBucket ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DeleteBucketRequest {
+ pub id: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DeleteBucketResponse;
+
+// ---- CleanupIncompleteUploads ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsRequest {
+ pub bucket_id: String,
+ pub older_than_secs: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsResponse {
+ pub uploads_deleted: u64,
+}
+
+// **********************************************
+// Operations on permissions for keys on buckets
+// **********************************************
+
+// ---- AllowBucketKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AllowBucketKeyRequest(pub BucketKeyPermChangeRequest);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AllowBucketKeyResponse(pub GetBucketInfoResponse);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BucketKeyPermChangeRequest {
+ pub bucket_id: String,
+ pub access_key_id: String,
+ pub permissions: ApiBucketKeyPerm,
+}
+
+// ---- DenyBucketKey ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DenyBucketKeyRequest(pub BucketKeyPermChangeRequest);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct DenyBucketKeyResponse(pub GetBucketInfoResponse);
+
+// **********************************************
+// Operations on bucket aliases
+// **********************************************
+
+// ---- AddBucketAlias ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AddBucketAliasRequest {
+ pub bucket_id: String,
+ #[serde(flatten)]
+ pub alias: BucketAliasEnum,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct AddBucketAliasResponse(pub GetBucketInfoResponse);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum BucketAliasEnum {
+ #[serde(rename_all = "camelCase")]
+ Global { global_alias: String },
+ #[serde(rename_all = "camelCase")]
+ Local {
+ local_alias: String,
+ access_key_id: String,
+ },
+}
+
+// ---- RemoveBucketAlias ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct RemoveBucketAliasRequest {
+ pub bucket_id: String,
+ #[serde(flatten)]
+ pub alias: BucketAliasEnum,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
+
+// **********************************************
+// Node operations
+// **********************************************
+
+// ---- CreateMetadataSnapshot ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalCreateMetadataSnapshotRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalCreateMetadataSnapshotResponse;
+
+// ---- GetNodeStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalGetNodeStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetNodeStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- GetClusterStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct GetClusterStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- LaunchRepairOperation ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationRequest {
+ pub repair_type: RepairType,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum RepairType {
+ Tables,
+ Blocks,
+ Versions,
+ MultipartUploads,
+ BlockRefs,
+ BlockRc,
+ Rebalance,
+ Scrub(ScrubCommand),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum ScrubCommand {
+ Start,
+ Pause,
+ Resume,
+ Cancel,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationResponse;
+
+// **********************************************
+// Worker operations
+// **********************************************
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalListWorkersRequest {
+ #[serde(default)]
+ pub busy_only: bool,
+ #[serde(default)]
+ pub error_only: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerInfoResp {
+ pub id: u64,
+ pub name: String,
+ pub state: WorkerStateResp,
+ pub errors: u64,
+ pub consecutive_errors: u64,
+ pub last_error: Option<WorkerLastError>,
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum WorkerStateResp {
+ Busy,
+ Throttled { duration_secs: f32 },
+ Idle,
+ Done,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerLastError {
+ pub message: String,
+ pub secs_ago: u64,
+}
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoRequest {
+ pub id: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
+
+// ---- GetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableRequest {
+ pub variable: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
+
+// ---- SetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableRequest {
+ pub variable: String,
+ pub value: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableResponse {
+ pub variable: String,
+ pub value: String,
+}
+
+// **********************************************
+// Block operations
+// **********************************************
+
+// ---- ListBlockErrors ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListBlockErrorsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockError {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try_secs_ago: u64,
+ pub next_try_in_secs: u64,
+}
+
+// ---- GetBlockInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoRequest {
+ pub block_hash: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoResponse {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub versions: Vec<BlockVersion>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockVersion {
+ pub version_id: String,
+ pub deleted: bool,
+ pub garbage_collected: bool,
+ pub backlink: Option<BlockVersionBacklink>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum BlockVersionBacklink {
+ Object {
+ bucket_id: String,
+ key: String,
+ },
+ Upload {
+ upload_id: String,
+ upload_deleted: bool,
+ upload_garbage_collected: bool,
+ bucket_id: Option<String>,
+ key: Option<String>,
+ },
+}
+
+// ---- RetryBlockResync ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum LocalRetryBlockResyncRequest {
+ #[serde(rename_all = "camelCase")]
+ All { all: bool },
+ #[serde(rename_all = "camelCase")]
+ Blocks { block_hashes: Vec<String> },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalRetryBlockResyncResponse {
+ pub count: u64,
+}
+
+// ---- PurgeBlocks ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksRequest(pub Vec<String>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksResponse {
+ pub blocks_purged: u64,
+ pub objects_deleted: u64,
+ pub uploads_deleted: u64,
+ pub versions_deleted: u64,
+}
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 6f0c474f..37574dcf 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,333 +1,234 @@
-use std::collections::HashMap;
+use std::borrow::Cow;
use std::sync::Arc;
use argon2::password_hash::PasswordHash;
-use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
-use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")]
use opentelemetry_prometheus::PrometheusExporter;
-#[cfg(feature = "metrics")]
-use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
-use garage_rpc::system::ClusterHealthStatus;
+use garage_rpc::{Endpoint as RpcEndpoint, *};
+use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_api_common::generic_server::*;
use garage_api_common::helpers::*;
-use crate::bucket::*;
-use crate::cluster::*;
+use crate::api::*;
use crate::error::*;
-use crate::key::*;
use crate::router_v0;
-use crate::router_v1::{Authorization, Endpoint};
+use crate::router_v1;
+use crate::Authorization;
+use crate::RequestHandler;
+
+// ---- FOR RPC ----
+
+pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpc {
+ Proxy(AdminApiRequest),
+ Internal(LocalAdminApiRequest),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpcResponse {
+ ProxyApiOkResponse(TaggedAdminApiResponse),
+ InternalApiOkResponse(LocalAdminApiResponse),
+ ApiErrorResponse {
+ http_code: u16,
+ error_code: String,
+ message: String,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpcResponse, GarageError>;
+}
+
+impl EndpointHandler<AdminRpc> for AdminApiServer {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpcResponse, GarageError> {
+ match message {
+ AdminRpc::Proxy(req) => {
+ info!("Proxied admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ AdminRpc::Internal(req) => {
+ info!("Internal admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ }
+ }
+}
+
+// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
#[cfg(feature = "metrics")]
- exporter: PrometheusExporter,
+ pub(crate) exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
+ pub(crate) background: Arc<BackgroundRunner>,
+ pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
+}
+
+pub enum HttpEndpoint {
+ Old(router_v1::Endpoint),
+ New(String),
}
impl AdminApiServer {
pub fn new(
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
- ) -> Self {
+ ) -> Arc<Self> {
let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
- Self {
+
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
garage,
#[cfg(feature = "metrics")]
exporter,
metrics_token,
admin_token,
- }
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
pub async fn run(
- self,
+ self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
- ApiServer::new(region, self)
+ ApiServer::new(region, ArcAdminApiServer(self))
.run_server(bind_addr, Some(0o220), must_exit)
.await
}
- fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> {
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .header(ALLOW, "OPTIONS, GET, POST")
- .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
- .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .body(empty_body())?)
- }
-
- async fn handle_check_domain(
+ async fn handle_http_api(
&self,
req: Request<IncomingBody>,
+ endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
- let query_params: HashMap<String, String> = req
- .uri()
- .query()
- .map(|v| {
- url::form_urlencoded::parse(v.as_bytes())
- .into_owned()
- .collect()
- })
- .unwrap_or_else(HashMap::new);
-
- let has_domain_key = query_params.contains_key("domain");
-
- if !has_domain_key {
- return Err(Error::bad_request("No domain query string found"));
- }
-
- let domain = query_params
- .get("domain")
- .ok_or_internal_error("Could not parse domain query string")?;
-
- if self.check_domain(domain).await? {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(string_body(format!(
- "Domain '{domain}' is managed by Garage"
- )))?)
- } else {
- Err(Error::bad_request(format!(
- "Domain '{domain}' is not managed by Garage"
- )))
- }
- }
-
- async fn check_domain(&self, domain: &str) -> Result<bool, Error> {
- // Resolve bucket from domain name, inferring if the website must be activated for the
- // domain to be valid.
- let (bucket_name, must_check_website) = if let Some(bname) = self
- .garage
- .config
- .s3_api
- .root_domain
- .as_ref()
- .and_then(|rd| host_to_bucket(domain, rd))
- {
- (bname.to_string(), false)
- } else if let Some(bname) = self
- .garage
- .config
- .s3_web
- .as_ref()
- .and_then(|sw| host_to_bucket(domain, sw.root_domain.as_str()))
- {
- (bname.to_string(), true)
- } else {
- (domain.to_string(), true)
- };
+ let auth_header = req.headers().get(AUTHORIZATION).cloned();
- let bucket_id = match self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&bucket_name)
- .await?
- {
- Some(bucket_id) => bucket_id,
- None => return Ok(false),
+ let request = match endpoint {
+ HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
+ HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
};
- if !must_check_website {
- return Ok(true);
- }
-
- let bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let bucket_state = bucket.state.as_option().unwrap();
- let bucket_website_config = bucket_state.website_config.get();
+ let required_auth_hash =
+ match request.authorization_type() {
+ Authorization::None => None,
+ Authorization::MetricsToken => self.metrics_token.as_deref(),
+ Authorization::AdminToken => match self.admin_token.as_deref() {
+ None => return Err(Error::forbidden(
+ "Admin token isn't configured, admin API access is disabled for security.",
+ )),
+ Some(t) => Some(t),
+ },
+ };
- match bucket_website_config {
- Some(_v) => Ok(true),
- None => Ok(false),
+ if let Some(password_hash) = required_auth_hash {
+ match auth_header {
+ None => return Err(Error::forbidden("Authorization token must be provided")),
+ Some(authorization) => {
+ verify_bearer_token(&authorization, password_hash)?;
+ }
+ }
}
- }
-
- fn handle_health(&self) -> Result<Response<ResBody>, Error> {
- let health = self.garage.system.health();
-
- let (status, status_str) = match health.status {
- ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
- ClusterHealthStatus::Degraded => (
- StatusCode::OK,
- "Garage is operational but some storage nodes are unavailable",
- ),
- ClusterHealthStatus::Unavailable => (
- StatusCode::SERVICE_UNAVAILABLE,
- "Quorum is not available for some/all partitions, reads and writes will fail",
- ),
- };
- let status_str = format!(
- "{}\nConsult the full health check API endpoint at /v1/health for more details\n",
- status_str
- );
-
- Ok(Response::builder()
- .status(status)
- .header(http::header::CONTENT_TYPE, "text/plain")
- .body(string_body(status_str))?)
- }
- fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
- #[cfg(feature = "metrics")]
- {
- use opentelemetry::trace::Tracer;
-
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
-
- let tracer = opentelemetry::global::tracer("garage");
- let metric_families = tracer.in_span("admin/gather_metrics", |_| {
- self.exporter.registry().gather()
- });
-
- encoder
- .encode(&metric_families, &mut buffer)
- .ok_or_internal_error("Could not serialize metrics")?;
-
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(http::header::CONTENT_TYPE, encoder.format_type())
- .body(bytes_body(buffer.into()))?)
+ match request {
+ AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await,
+ req => {
+ let res = req.handle(&self.garage, &self).await?;
+ let mut res = json_ok_response(&res)?;
+ res.headers_mut()
+ .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
+ Ok(res)
+ }
}
- #[cfg(not(feature = "metrics"))]
- Err(Error::bad_request(
- "Garage was built without the metrics feature".to_string(),
- ))
}
}
-impl ApiHandler for AdminApiServer {
+struct ArcAdminApiServer(Arc<AdminApiServer>);
+
+impl ApiHandler for ArcAdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
- type Endpoint = Endpoint;
+ type Endpoint = HttpEndpoint;
type Error = Error;
- fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
- Endpoint::from_v0(endpoint_v0)
+ let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
+ } else if req.uri().path().starts_with("/v1/") {
+ let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
} else {
- Endpoint::from_request(req)
+ Ok(HttpEndpoint::New(req.uri().path().to_string()))
}
}
async fn handle(
&self,
req: Request<IncomingBody>,
- endpoint: Endpoint,
+ endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
- let required_auth_hash =
- match endpoint.authorization_type() {
- Authorization::None => None,
- Authorization::MetricsToken => self.metrics_token.as_deref(),
- Authorization::AdminToken => match self.admin_token.as_deref() {
- None => return Err(Error::forbidden(
- "Admin token isn't configured, admin API access is disabled for security.",
- )),
- Some(t) => Some(t),
- },
- };
-
- if let Some(password_hash) = required_auth_hash {
- match req.headers().get("Authorization") {
- None => return Err(Error::forbidden("Authorization token must be provided")),
- Some(authorization) => {
- verify_bearer_token(&authorization, password_hash)?;
- }
- }
- }
-
- match endpoint {
- Endpoint::Options => self.handle_options(&req),
- Endpoint::CheckDomain => self.handle_check_domain(req).await,
- Endpoint::Health => self.handle_health(),
- Endpoint::Metrics => self.handle_metrics(),
- Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
- Endpoint::GetClusterHealth => handle_get_cluster_health(&self.garage).await,
- Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await,
- // Layout
- Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
- Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
- Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
- Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await,
- // Keys
- Endpoint::ListKeys => handle_list_keys(&self.garage).await,
- Endpoint::GetKeyInfo {
- id,
- search,
- show_secret_key,
- } => {
- let show_secret_key = show_secret_key.map(|x| x == "true").unwrap_or(false);
- handle_get_key_info(&self.garage, id, search, show_secret_key).await
- }
- Endpoint::CreateKey => handle_create_key(&self.garage, req).await,
- Endpoint::ImportKey => handle_import_key(&self.garage, req).await,
- Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await,
- Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await,
- // Buckets
- Endpoint::ListBuckets => handle_list_buckets(&self.garage).await,
- Endpoint::GetBucketInfo { id, global_alias } => {
- handle_get_bucket_info(&self.garage, id, global_alias).await
- }
- Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await,
- Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await,
- Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await,
- // Bucket-key permissions
- Endpoint::BucketAllowKey => {
- handle_bucket_change_key_perm(&self.garage, req, true).await
- }
- Endpoint::BucketDenyKey => {
- handle_bucket_change_key_perm(&self.garage, req, false).await
- }
- // Bucket aliasing
- Endpoint::GlobalAliasBucket { id, alias } => {
- handle_global_alias_bucket(&self.garage, id, alias).await
- }
- Endpoint::GlobalUnaliasBucket { id, alias } => {
- handle_global_unalias_bucket(&self.garage, id, alias).await
- }
- Endpoint::LocalAliasBucket {
- id,
- access_key_id,
- alias,
- } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await,
- Endpoint::LocalUnaliasBucket {
- id,
- access_key_id,
- alias,
- } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await,
- }
+ self.0.handle_http_api(req, endpoint).await
}
}
-impl ApiEndpoint for Endpoint {
- fn name(&self) -> &'static str {
- Endpoint::name(self)
+impl ApiEndpoint for HttpEndpoint {
+ fn name(&self) -> Cow<'static, str> {
+ match self {
+ Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
+ Self::New(path) => Cow::Owned(path.clone()),
+ }
}
fn add_span_attributes(&self, _span: SpanRef<'_>) {}
diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs
new file mode 100644
index 00000000..73d186a6
--- /dev/null
+++ b/src/api/admin/block.rs
@@ -0,0 +1,274 @@
+use std::sync::Arc;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::now_msec;
+
+use garage_table::EmptyKey;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use garage_api_common::common_error::CommonErrorDerivative;
+
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListBlockErrorsRequest {
+ type Response = LocalListBlockErrorsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalListBlockErrorsResponse, Error> {
+ let errors = garage.block_manager.list_resync_errors()?;
+ let now = now_msec();
+ let errors = errors
+ .into_iter()
+ .map(|e| BlockError {
+ block_hash: hex::encode(&e.hash),
+ refcount: e.refcount,
+ error_count: e.error_count,
+ last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
+ next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
+ })
+ .collect();
+ Ok(LocalListBlockErrorsResponse(errors))
+ }
+}
+
+impl RequestHandler for LocalGetBlockInfoRequest {
+ type Response = LocalGetBlockInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetBlockInfoResponse, Error> {
+ let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
+ let refcount = garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ let bl = match &v.backlink {
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: u.deleted.get(),
+ upload_garbage_collected: false,
+ bucket_id: Some(hex::encode(&u.bucket_id)),
+ key: Some(u.key.to_string()),
+ }
+ } else {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: true,
+ upload_garbage_collected: true,
+ bucket_id: None,
+ key: None,
+ }
+ }
+ }
+ VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
+ bucket_id: hex::encode(&bucket_id),
+ key: key.to_string(),
+ },
+ };
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: v.deleted.get(),
+ garbage_collected: false,
+ backlink: Some(bl),
+ });
+ } else {
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: true,
+ garbage_collected: true,
+ backlink: None,
+ });
+ }
+ }
+ Ok(LocalGetBlockInfoResponse {
+ block_hash: hex::encode(&hash),
+ refcount,
+ versions,
+ })
+ }
+}
+
+impl RequestHandler for LocalRetryBlockResyncRequest {
+ type Response = LocalRetryBlockResyncResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalRetryBlockResyncResponse, Error> {
+ match self {
+ Self::All { all: true } => {
+ let blocks = garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: blocks.len() as u64,
+ })
+ }
+ Self::All { all: false } => Err(Error::bad_request("nonsense")),
+ Self::Blocks { block_hashes } => {
+ for hash in block_hashes.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: block_hashes.len() as u64,
+ })
+ }
+ }
+ }
+}
+
+impl RequestHandler for LocalPurgeBlocksRequest {
+ type Response = LocalPurgeBlocksResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalPurgeBlocksResponse, Error> {
+ let mut obj_dels = 0;
+ let mut mpu_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in self.0.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ handle_block_purge_version_backlink(
+ garage,
+ &version,
+ &mut obj_dels,
+ &mut mpu_dels,
+ )
+ .await?;
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(version.uuid, version.backlink, true);
+ garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ }
+
+ Ok(LocalPurgeBlocksResponse {
+ blocks_purged: self.0.len() as u64,
+ versions_deleted: ver_dels,
+ objects_deleted: obj_dels,
+ uploads_deleted: mpu_dels,
+ })
+ }
+}
+
+fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
+ if prefix.len() < 4 {
+ return Err(Error::bad_request(
+ "Please specify at least 4 characters of the block hash",
+ ));
+ }
+
+ let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
+
+ let iter = garage
+ .block_ref_table
+ .data
+ .store
+ .range(&prefix_bin[..]..)
+ .map_err(GarageError::from)?;
+ let mut found = None;
+ for item in iter {
+ let (k, _v) = item.map_err(GarageError::from)?;
+ let hash = Hash::try_from(&k[..32]).unwrap();
+ if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
+ break;
+ }
+ if hex::encode(hash.as_slice()).starts_with(prefix) {
+ match &found {
+ Some(x) if *x == hash => (),
+ Some(_) => {
+ return Err(Error::bad_request(format!(
+ "Several blocks match prefix `{}`",
+ prefix
+ )));
+ }
+ None => {
+ found = Some(hash);
+ }
+ }
+ }
+ }
+
+ found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
+}
+
+async fn handle_block_purge_version_backlink(
+ garage: &Arc<Garage>,
+ version: &Version,
+ obj_dels: &mut u64,
+ mpu_dels: &mut u64,
+) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 2537bfc9..d2bb62e0 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-
-use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
-use serde::{Deserialize, Serialize};
+use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
@@ -18,102 +16,97 @@ use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*;
use garage_api_common::common_error::CommonError;
-use garage_api_common::helpers::*;
-use crate::api_server::ResBody;
+use crate::api::*;
use crate::error::*;
-use crate::key::ApiBucketKeyPerm;
-
-pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let buckets = garage
- .bucket_table
- .get_range(
- &EmptyKey,
- None,
- Some(DeletedFilter::NotDeleted),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?;
-
- let res = buckets
- .into_iter()
- .map(|b| {
- let state = b.state.as_option().unwrap();
- ListBucketResultItem {
- id: hex::encode(b.id),
- global_aliases: state
- .aliases
- .items()
- .iter()
- .filter(|(_, _, a)| *a)
- .map(|(n, _, _)| n.to_string())
- .collect::<Vec<_>>(),
- local_aliases: state
- .local_aliases
- .items()
- .iter()
- .filter(|(_, _, a)| *a)
- .map(|((k, n), _, _)| BucketLocalAlias {
- access_key_id: k.to_string(),
- alias: n.to_string(),
- })
- .collect::<Vec<_>>(),
- }
- })
- .collect::<Vec<_>>();
-
- Ok(json_ok_response(&res)?)
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct ListBucketResultItem {
- id: String,
- global_aliases: Vec<String>,
- local_aliases: Vec<BucketLocalAlias>,
-}
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for ListBucketsRequest {
+ type Response = ListBucketsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ListBucketsResponse, Error> {
+ let buckets = garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct BucketLocalAlias {
- access_key_id: String,
- alias: String,
-}
+ let res = buckets
+ .into_iter()
+ .map(|b| {
+ let state = b.state.as_option().unwrap();
+ ListBucketsResponseItem {
+ id: hex::encode(b.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ local_aliases: state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|((k, n), _, _)| BucketLocalAlias {
+ access_key_id: k.to_string(),
+ alias: n.to_string(),
+ })
+ .collect::<Vec<_>>(),
+ }
+ })
+ .collect::<Vec<_>>();
-#[derive(Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct ApiBucketQuotas {
- max_size: Option<u64>,
- max_objects: Option<u64>,
+ Ok(ListBucketsResponse(res))
+ }
}
-pub async fn handle_get_bucket_info(
- garage: &Arc<Garage>,
- id: Option<String>,
- global_alias: Option<String>,
-) -> Result<Response<ResBody>, Error> {
- let bucket_id = match (id, global_alias) {
- (Some(id), None) => parse_bucket_id(&id)?,
- (None, Some(ga)) => garage
- .bucket_helper()
- .resolve_global_bucket_name(&ga)
- .await?
- .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
- _ => {
- return Err(Error::bad_request(
- "Either id or globalAlias must be provided (but not both)",
- ));
- }
- };
+impl RequestHandler for GetBucketInfoRequest {
+ type Response = GetBucketInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetBucketInfoResponse, Error> {
+ let bucket_id = match (self.id, self.global_alias, self.search) {
+ (Some(id), None, None) => parse_bucket_id(&id)?,
+ (None, Some(ga), None) => garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&ga)
+ .await?
+ .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
+ (None, None, Some(search)) => {
+ garage
+ .bucket_helper()
+ .admin_get_existing_matching_bucket(&search)
+ .await?
+ }
+ _ => {
+ return Err(Error::bad_request(
+ "Either id, globalAlias or search must be provided (but not several of them)",
+ ));
+ }
+ };
- bucket_info_results(garage, bucket_id).await
+ bucket_info_results(garage, bucket_id).await
+ }
}
async fn bucket_info_results(
garage: &Arc<Garage>,
bucket_id: Uuid,
-) -> Result<Response<ResBody>, Error> {
+) -> Result<GetBucketInfoResponse, Error> {
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
@@ -176,301 +169,295 @@ async fn bucket_info_results(
let state = bucket.state.as_option().unwrap();
let quotas = state.quotas.get();
- let res =
- GetBucketInfoResult {
- id: hex::encode(bucket.id),
- global_aliases: state
- .aliases
- .items()
- .iter()
- .filter(|(_, _, a)| *a)
- .map(|(n, _, _)| n.to_string())
- .collect::<Vec<_>>(),
- website_access: state.website_config.get().is_some(),
- website_config: state.website_config.get().clone().map(|wsc| {
- GetBucketInfoWebsiteResult {
- index_document: wsc.index_document,
- error_document: wsc.error_document,
+ let res = GetBucketInfoResponse {
+ id: hex::encode(bucket.id),
+ global_aliases: state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, a)| *a)
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
+ website_access: state.website_config.get().is_some(),
+ website_config: state.website_config.get().clone().map(|wsc| {
+ GetBucketInfoWebsiteResponse {
+ index_document: wsc.index_document,
+ error_document: wsc.error_document,
+ }
+ }),
+ keys: relevant_keys
+ .into_values()
+ .map(|key| {
+ let p = key.state.as_option().unwrap();
+ GetBucketInfoKey {
+ access_key_id: key.key_id,
+ name: p.name.get().to_string(),
+ permissions: p
+ .authorized_buckets
+ .get(&bucket.id)
+ .map(|p| ApiBucketKeyPerm {
+ read: p.allow_read,
+ write: p.allow_write,
+ owner: p.allow_owner,
+ })
+ .unwrap_or_default(),
+ bucket_local_aliases: p
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, b)| *b == Some(bucket.id))
+ .map(|(n, _, _)| n.to_string())
+ .collect::<Vec<_>>(),
}
- }),
- keys: relevant_keys
- .into_values()
- .map(|key| {
- let p = key.state.as_option().unwrap();
- GetBucketInfoKey {
- access_key_id: key.key_id,
- name: p.name.get().to_string(),
- permissions: p
- .authorized_buckets
- .get(&bucket.id)
- .map(|p| ApiBucketKeyPerm {
- read: p.allow_read,
- write: p.allow_write,
- owner: p.allow_owner,
- })
- .unwrap_or_default(),
- bucket_local_aliases: p
- .local_aliases
- .items()
- .iter()
- .filter(|(_, _, b)| *b == Some(bucket.id))
- .map(|(n, _, _)| n.to_string())
- .collect::<Vec<_>>(),
- }
- })
- .collect::<Vec<_>>(),
- objects: *counters.get(OBJECTS).unwrap_or(&0),
- bytes: *counters.get(BYTES).unwrap_or(&0),
- unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0),
- unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0),
- unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0),
- unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0),
- quotas: ApiBucketQuotas {
- max_size: quotas.max_size,
- max_objects: quotas.max_objects,
- },
- };
-
- Ok(json_ok_response(&res)?)
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetBucketInfoResult {
- id: String,
- global_aliases: Vec<String>,
- website_access: bool,
- #[serde(default)]
- website_config: Option<GetBucketInfoWebsiteResult>,
- keys: Vec<GetBucketInfoKey>,
- objects: i64,
- bytes: i64,
- unfinished_uploads: i64,
- unfinished_multipart_uploads: i64,
- unfinished_multipart_upload_parts: i64,
- unfinished_multipart_upload_bytes: i64,
- quotas: ApiBucketQuotas,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetBucketInfoWebsiteResult {
- index_document: String,
- error_document: Option<String>,
-}
+ })
+ .collect::<Vec<_>>(),
+ objects: *counters.get(OBJECTS).unwrap_or(&0),
+ bytes: *counters.get(BYTES).unwrap_or(&0),
+ unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0),
+ unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0),
+ unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0),
+ unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0),
+ quotas: ApiBucketQuotas {
+ max_size: quotas.max_size,
+ max_objects: quotas.max_objects,
+ },
+ };
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetBucketInfoKey {
- access_key_id: String,
- name: String,
- permissions: ApiBucketKeyPerm,
- bucket_local_aliases: Vec<String>,
+ Ok(res)
}
-pub async fn handle_create_bucket(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
-
- let helper = garage.locked_helper().await;
-
- if let Some(ga) = &req.global_alias {
- if !is_valid_bucket_name(ga) {
- return Err(Error::bad_request(format!(
- "{}: {}",
- ga, INVALID_BUCKET_NAME_MESSAGE
- )));
- }
+impl RequestHandler for CreateBucketRequest {
+ type Response = CreateBucketResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateBucketResponse, Error> {
+ let helper = garage.locked_helper().await;
+
+ if let Some(ga) = &self.global_alias {
+ if !is_valid_bucket_name(ga) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ ga, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
- if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
- if alias.state.get().is_some() {
- return Err(CommonError::BucketAlreadyExists.into());
+ if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
+ if alias.state.get().is_some() {
+ return Err(CommonError::BucketAlreadyExists.into());
+ }
}
}
- }
- if let Some(la) = &req.local_alias {
- if !is_valid_bucket_name(&la.alias) {
- return Err(Error::bad_request(format!(
- "{}: {}",
- la.alias, INVALID_BUCKET_NAME_MESSAGE
- )));
- }
+ if let Some(la) = &self.local_alias {
+ if !is_valid_bucket_name(&la.alias) {
+ return Err(Error::bad_request(format!(
+ "{}: {}",
+ la.alias, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
- let key = helper.key().get_existing_key(&la.access_key_id).await?;
- let state = key.state.as_option().unwrap();
- if matches!(state.local_aliases.get(&la.alias), Some(_)) {
- return Err(Error::bad_request("Local alias already exists"));
+ let key = helper.key().get_existing_key(&la.access_key_id).await?;
+ let state = key.state.as_option().unwrap();
+ if matches!(state.local_aliases.get(&la.alias), Some(_)) {
+ return Err(Error::bad_request("Local alias already exists"));
+ }
}
- }
- let bucket = Bucket::new();
- garage.bucket_table.insert(&bucket).await?;
-
- if let Some(ga) = &req.global_alias {
- helper.set_global_bucket_alias(bucket.id, ga).await?;
- }
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
- if let Some(la) = &req.local_alias {
- helper
- .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
- .await?;
+ if let Some(ga) = &self.global_alias {
+ helper.set_global_bucket_alias(bucket.id, ga).await?;
+ }
- if la.allow.read || la.allow.write || la.allow.owner {
+ if let Some(la) = &self.local_alias {
helper
- .set_bucket_key_permissions(
- bucket.id,
- &la.access_key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read: la.allow.read,
- allow_write: la.allow.write,
- allow_owner: la.allow.owner,
- },
- )
+ .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
.await?;
- }
- }
- bucket_info_results(garage, bucket.id).await
-}
+ if la.allow.read || la.allow.write || la.allow.owner {
+ helper
+ .set_bucket_key_permissions(
+ bucket.id,
+ &la.access_key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read: la.allow.read,
+ allow_write: la.allow.write,
+ allow_owner: la.allow.owner,
+ },
+ )
+ .await?;
+ }
+ }
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct CreateBucketRequest {
- global_alias: Option<String>,
- local_alias: Option<CreateBucketLocalAlias>,
+ Ok(CreateBucketResponse(
+ bucket_info_results(garage, bucket.id).await?,
+ ))
+ }
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct CreateBucketLocalAlias {
- access_key_id: String,
- alias: String,
- #[serde(default)]
- allow: ApiBucketKeyPerm,
-}
+impl RequestHandler for DeleteBucketRequest {
+ type Response = DeleteBucketResponse;
-pub async fn handle_delete_bucket(
- garage: &Arc<Garage>,
- id: String,
-) -> Result<Response<ResBody>, Error> {
- let helper = garage.locked_helper().await;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteBucketResponse, Error> {
+ let helper = garage.locked_helper().await;
- let bucket_id = parse_bucket_id(&id)?;
+ let bucket_id = parse_bucket_id(&self.id)?;
- let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
- let state = bucket.state.as_option().unwrap();
+ let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
+ let state = bucket.state.as_option().unwrap();
- // Check bucket is empty
- if !helper.bucket().is_bucket_empty(bucket_id).await? {
- return Err(CommonError::BucketNotEmpty.into());
- }
+ // Check bucket is empty
+ if !helper.bucket().is_bucket_empty(bucket_id).await? {
+ return Err(CommonError::BucketNotEmpty.into());
+ }
- // --- done checking, now commit ---
- // 1. delete authorization from keys that had access
- for (key_id, perm) in bucket.authorized_keys() {
- if perm.is_any() {
- helper
- .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, perm) in bucket.authorized_keys() {
+ if perm.is_any() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
}
- }
- // 2. delete all local aliases
- for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
- if *active {
- helper
- .unset_local_bucket_alias(bucket.id, key_id, alias)
- .await?;
+ // 2. delete all local aliases
+ for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
+ if *active {
+ helper
+ .unset_local_bucket_alias(bucket.id, key_id, alias)
+ .await?;
+ }
}
- }
- // 3. delete all global aliases
- for (alias, _, active) in state.aliases.items().iter() {
- if *active {
- helper.purge_global_bucket_alias(bucket.id, alias).await?;
+ // 3. delete all global aliases
+ for (alias, _, active) in state.aliases.items().iter() {
+ if *active {
+ helper.purge_global_bucket_alias(bucket.id, alias).await?;
+ }
}
- }
- // 4. delete bucket
- bucket.state = Deletable::delete();
- garage.bucket_table.insert(&bucket).await?;
+ // 4. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(empty_body())?)
+ Ok(DeleteBucketResponse)
+ }
}
-pub async fn handle_update_bucket(
- garage: &Arc<Garage>,
- id: String,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?;
- let bucket_id = parse_bucket_id(&id)?;
+impl RequestHandler for UpdateBucketRequest {
+ type Response = UpdateBucketResponse;
- let mut bucket = garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateBucketResponse, Error> {
+ let bucket_id = parse_bucket_id(&self.id)?;
- let state = bucket.state.as_option_mut().unwrap();
-
- if let Some(wa) = req.website_access {
- if wa.enabled {
- state.website_config.update(Some(WebsiteConfig {
- index_document: wa.index_document.ok_or_bad_request(
- "Please specify indexDocument when enabling website access.",
- )?,
- error_document: wa.error_document,
- }));
- } else {
- if wa.index_document.is_some() || wa.error_document.is_some() {
- return Err(Error::bad_request(
- "Cannot specify indexDocument or errorDocument when disabling website access.",
- ));
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let state = bucket.state.as_option_mut().unwrap();
+
+ if let Some(wa) = self.body.website_access {
+ if wa.enabled {
+ state.website_config.update(Some(WebsiteConfig {
+ index_document: wa.index_document.ok_or_bad_request(
+ "Please specify indexDocument when enabling website access.",
+ )?,
+ error_document: wa.error_document,
+ }));
+ } else {
+ if wa.index_document.is_some() || wa.error_document.is_some() {
+ return Err(Error::bad_request(
+ "Cannot specify indexDocument or errorDocument when disabling website access.",
+ ));
+ }
+ state.website_config.update(None);
}
- state.website_config.update(None);
}
- }
- if let Some(q) = req.quotas {
- state.quotas.update(BucketQuotas {
- max_size: q.max_size,
- max_objects: q.max_objects,
- });
- }
+ if let Some(q) = self.body.quotas {
+ state.quotas.update(BucketQuotas {
+ max_size: q.max_size,
+ max_objects: q.max_objects,
+ });
+ }
- garage.bucket_table.insert(&bucket).await?;
+ garage.bucket_table.insert(&bucket).await?;
- bucket_info_results(garage, bucket_id).await
+ Ok(UpdateBucketResponse(
+ bucket_info_results(garage, bucket_id).await?,
+ ))
+ }
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct UpdateBucketRequest {
- website_access: Option<UpdateBucketWebsiteAccess>,
- quotas: Option<ApiBucketQuotas>,
-}
+impl RequestHandler for CleanupIncompleteUploadsRequest {
+ type Response = CleanupIncompleteUploadsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CleanupIncompleteUploadsResponse, Error> {
+ let duration = Duration::from_secs(self.older_than_secs);
+
+ let bucket_id = parse_bucket_id(&self.bucket_id)?;
+
+ let count = garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket_id, duration)
+ .await?;
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct UpdateBucketWebsiteAccess {
- enabled: bool,
- index_document: Option<String>,
- error_document: Option<String>,
+ Ok(CleanupIncompleteUploadsResponse {
+ uploads_deleted: count as u64,
+ })
+ }
}
// ---- BUCKET/KEY PERMISSIONS ----
+impl RequestHandler for AllowBucketKeyRequest {
+ type Response = AllowBucketKeyResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AllowBucketKeyResponse, Error> {
+ let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
+ Ok(AllowBucketKeyResponse(res))
+ }
+}
+
+impl RequestHandler for DenyBucketKeyRequest {
+ type Response = DenyBucketKeyResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DenyBucketKeyResponse, Error> {
+ let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
+ Ok(DenyBucketKeyResponse(res))
+ }
+}
+
pub async fn handle_bucket_change_key_perm(
garage: &Arc<Garage>,
- req: Request<IncomingBody>,
+ req: BucketKeyPermChangeRequest,
new_perm_flag: bool,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
-
+) -> Result<GetBucketInfoResponse, Error> {
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&req.bucket_id)?;
@@ -503,76 +490,74 @@ pub async fn handle_bucket_change_key_perm(
bucket_info_results(garage, bucket.id).await
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct BucketKeyPermChangeRequest {
- bucket_id: String,
- access_key_id: String,
- permissions: ApiBucketKeyPerm,
-}
-
// ---- BUCKET ALIASES ----
-pub async fn handle_global_alias_bucket(
- garage: &Arc<Garage>,
- bucket_id: String,
- alias: String,
-) -> Result<Response<ResBody>, Error> {
- let bucket_id = parse_bucket_id(&bucket_id)?;
-
- let helper = garage.locked_helper().await;
+impl RequestHandler for AddBucketAliasRequest {
+ type Response = AddBucketAliasResponse;
- helper.set_global_bucket_alias(bucket_id, &alias).await?;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AddBucketAliasResponse, Error> {
+ let bucket_id = parse_bucket_id(&self.bucket_id)?;
- bucket_info_results(garage, bucket_id).await
-}
+ let helper = garage.locked_helper().await;
-pub async fn handle_global_unalias_bucket(
- garage: &Arc<Garage>,
- bucket_id: String,
- alias: String,
-) -> Result<Response<ResBody>, Error> {
- let bucket_id = parse_bucket_id(&bucket_id)?;
-
- let helper = garage.locked_helper().await;
-
- helper.unset_global_bucket_alias(bucket_id, &alias).await?;
+ match self.alias {
+ BucketAliasEnum::Global { global_alias } => {
+ helper
+ .set_global_bucket_alias(bucket_id, &global_alias)
+ .await?;
+ }
+ BucketAliasEnum::Local {
+ local_alias,
+ access_key_id,
+ } => {
+ helper
+ .set_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
+ .await?;
+ }
+ }
- bucket_info_results(garage, bucket_id).await
+ Ok(AddBucketAliasResponse(
+ bucket_info_results(garage, bucket_id).await?,
+ ))
+ }
}
-pub async fn handle_local_alias_bucket(
- garage: &Arc<Garage>,
- bucket_id: String,
- access_key_id: String,
- alias: String,
-) -> Result<Response<ResBody>, Error> {
- let bucket_id = parse_bucket_id(&bucket_id)?;
-
- let helper = garage.locked_helper().await;
-
- helper
- .set_local_bucket_alias(bucket_id, &access_key_id, &alias)
- .await?;
-
- bucket_info_results(garage, bucket_id).await
-}
+impl RequestHandler for RemoveBucketAliasRequest {
+ type Response = RemoveBucketAliasResponse;
-pub async fn handle_local_unalias_bucket(
- garage: &Arc<Garage>,
- bucket_id: String,
- access_key_id: String,
- alias: String,
-) -> Result<Response<ResBody>, Error> {
- let bucket_id = parse_bucket_id(&bucket_id)?;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RemoveBucketAliasResponse, Error> {
+ let bucket_id = parse_bucket_id(&self.bucket_id)?;
- let helper = garage.locked_helper().await;
+ let helper = garage.locked_helper().await;
- helper
- .unset_local_bucket_alias(bucket_id, &access_key_id, &alias)
- .await?;
+ match self.alias {
+ BucketAliasEnum::Global { global_alias } => {
+ helper
+ .unset_global_bucket_alias(bucket_id, &global_alias)
+ .await?;
+ }
+ BucketAliasEnum::Local {
+ local_alias,
+ access_key_id,
+ } => {
+ helper
+ .unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
+ .await?;
+ }
+ }
- bucket_info_results(garage, bucket_id).await
+ Ok(RemoveBucketAliasResponse(
+ bucket_info_results(garage, bucket_id).await?,
+ ))
+ }
}
// ---- HELPER ----
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index ffa0fa71..cb1fa493 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,10 +1,6 @@
use std::collections::HashMap;
-use std::net::SocketAddr;
use std::sync::Arc;
-use hyper::{body::Incoming as IncomingBody, Request, Response};
-use serde::{Deserialize, Serialize};
-
use garage_util::crdt::*;
use garage_util::data::*;
@@ -12,158 +8,182 @@ use garage_rpc::layout;
use garage_model::garage::Garage;
-use garage_api_common::helpers::{json_ok_response, parse_json_body};
-
-use crate::api_server::ResBody;
+use crate::api::*;
use crate::error::*;
-
-pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let layout = garage.system.cluster_layout();
- let mut nodes = garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|i| {
- (
- i.id,
- NodeResp {
- id: hex::encode(i.id),
- addr: i.addr,
- hostname: i.status.hostname,
- is_up: i.is_up,
- last_seen_secs_ago: i.last_seen_secs_ago,
- data_partition: i
- .status
- .data_disk_avail
- .map(|(avail, total)| FreeSpaceResp {
- available: avail,
- total,
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for GetClusterStatusRequest {
+ type Response = GetClusterStatusResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatusResponse, Error> {
+ let layout = garage.system.cluster_layout();
+ let mut nodes = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|i| {
+ (
+ i.id,
+ NodeResp {
+ id: hex::encode(i.id),
+ addr: i.addr,
+ hostname: i.status.hostname,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ data_partition: i.status.data_disk_avail.map(|(avail, total)| {
+ FreeSpaceResp {
+ available: avail,
+ total,
+ }
}),
- metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| {
- FreeSpaceResp {
- available: avail,
- total,
- }
- }),
- ..Default::default()
- },
- )
- })
- .collect::<HashMap<_, _>>();
-
- for (id, _, role) in layout.current().roles.items().iter() {
- if let layout::NodeRoleV(Some(r)) = role {
- let role = NodeRoleResp {
- id: hex::encode(id),
- zone: r.zone.to_string(),
- capacity: r.capacity,
- tags: r.tags.clone(),
- };
- match nodes.get_mut(id) {
- None => {
- nodes.insert(
- *id,
- NodeResp {
- id: hex::encode(id),
- role: Some(role),
- ..Default::default()
- },
- );
- }
- Some(n) => {
- n.role = Some(role);
- }
- }
- }
- }
+ metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| {
+ FreeSpaceResp {
+ available: avail,
+ total,
+ }
+ }),
+ ..Default::default()
+ },
+ )
+ })
+ .collect::<HashMap<_, _>>();
- for ver in layout.versions().iter().rev().skip(1) {
- for (id, _, role) in ver.roles.items().iter() {
+ for (id, _, role) in layout.current().roles.items().iter() {
if let layout::NodeRoleV(Some(r)) = role {
- if r.capacity.is_some() {
- if let Some(n) = nodes.get_mut(id) {
- if n.role.is_none() {
- n.draining = true;
- }
- } else {
+ let role = NodeRoleResp {
+ id: hex::encode(id),
+ zone: r.zone.to_string(),
+ capacity: r.capacity,
+ tags: r.tags.clone(),
+ };
+ match nodes.get_mut(id) {
+ None => {
nodes.insert(
*id,
NodeResp {
id: hex::encode(id),
- draining: true,
+ role: Some(role),
..Default::default()
},
);
}
+ Some(n) => {
+ n.role = Some(role);
+ }
}
}
}
- }
-
- let mut nodes = nodes.into_values().collect::<Vec<_>>();
- nodes.sort_by(|x, y| x.id.cmp(&y.id));
- let res = GetClusterStatusResponse {
- node: hex::encode(garage.system.id),
- garage_version: garage_util::version::garage_version(),
- garage_features: garage_util::version::garage_features(),
- rust_version: garage_util::version::rust_version(),
- db_engine: garage.db.engine(),
- layout_version: layout.current().version,
- nodes,
- };
+ for ver in layout.versions().iter().rev().skip(1) {
+ for (id, _, role) in ver.roles.items().iter() {
+ if let layout::NodeRoleV(Some(r)) = role {
+ if r.capacity.is_some() {
+ if let Some(n) = nodes.get_mut(id) {
+ if n.role.is_none() {
+ n.draining = true;
+ }
+ } else {
+ nodes.insert(
+ *id,
+ NodeResp {
+ id: hex::encode(id),
+ draining: true,
+ ..Default::default()
+ },
+ );
+ }
+ }
+ }
+ }
+ }
- Ok(json_ok_response(&res)?)
+ let mut nodes = nodes.into_values().collect::<Vec<_>>();
+ nodes.sort_by(|x, y| x.id.cmp(&y.id));
+
+ Ok(GetClusterStatusResponse {
+ node: hex::encode(garage.system.id),
+ garage_version: garage_util::version::garage_version().to_string(),
+ garage_features: garage_util::version::garage_features()
+ .map(|features| features.iter().map(ToString::to_string).collect()),
+ rust_version: garage_util::version::rust_version().to_string(),
+ db_engine: garage.db.engine(),
+ layout_version: layout.current().version,
+ nodes,
+ })
+ }
}
-pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- use garage_rpc::system::ClusterHealthStatus;
- let health = garage.system.health();
- let health = ClusterHealth {
- status: match health.status {
- ClusterHealthStatus::Healthy => "healthy",
- ClusterHealthStatus::Degraded => "degraded",
- ClusterHealthStatus::Unavailable => "unavailable",
- },
- known_nodes: health.known_nodes,
- connected_nodes: health.connected_nodes,
- storage_nodes: health.storage_nodes,
- storage_nodes_ok: health.storage_nodes_ok,
- partitions: health.partitions,
- partitions_quorum: health.partitions_quorum,
- partitions_all_ok: health.partitions_all_ok,
- };
- Ok(json_ok_response(&health)?)
+impl RequestHandler for GetClusterHealthRequest {
+ type Response = GetClusterHealthResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterHealthResponse, Error> {
+ use garage_rpc::system::ClusterHealthStatus;
+ let health = garage.system.health();
+ let health = GetClusterHealthResponse {
+ status: match health.status {
+ ClusterHealthStatus::Healthy => "healthy",
+ ClusterHealthStatus::Degraded => "degraded",
+ ClusterHealthStatus::Unavailable => "unavailable",
+ }
+ .to_string(),
+ known_nodes: health.known_nodes,
+ connected_nodes: health.connected_nodes,
+ storage_nodes: health.storage_nodes,
+ storage_nodes_ok: health.storage_nodes_ok,
+ partitions: health.partitions,
+ partitions_quorum: health.partitions_quorum,
+ partitions_all_ok: health.partitions_all_ok,
+ };
+ Ok(health)
+ }
}
-pub async fn handle_connect_cluster_nodes(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<Vec<String>, _, Error>(req).await?;
-
- let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
- .await
- .into_iter()
- .map(|r| match r {
- Ok(()) => ConnectClusterNodesResponse {
- success: true,
- error: None,
- },
- Err(e) => ConnectClusterNodesResponse {
- success: false,
- error: Some(format!("{}", e)),
- },
- })
- .collect::<Vec<_>>();
-
- Ok(json_ok_response(&res)?)
+impl RequestHandler for ConnectClusterNodesRequest {
+ type Response = ConnectClusterNodesResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ConnectClusterNodesResponse, Error> {
+ let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
+ .await
+ .into_iter()
+ .map(|r| match r {
+ Ok(()) => ConnectNodeResponse {
+ success: true,
+ error: None,
+ },
+ Err(e) => ConnectNodeResponse {
+ success: false,
+ error: Some(format!("{}", e)),
+ },
+ })
+ .collect::<Vec<_>>();
+ Ok(ConnectClusterNodesResponse(res))
+ }
}
-pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let res = format_cluster_layout(garage.system.cluster_layout().inner());
-
- Ok(json_ok_response(&res)?)
+impl RequestHandler for GetClusterLayoutRequest {
+ type Response = GetClusterLayoutResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterLayoutResponse, Error> {
+ Ok(format_cluster_layout(
+ garage.system.cluster_layout().inner(),
+ ))
+ }
}
fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse {
@@ -213,199 +233,98 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
// ----
-#[derive(Debug, Clone, Copy, Serialize)]
-#[serde(rename_all = "camelCase")]
-pub struct ClusterHealth {
- status: &'static str,
- known_nodes: usize,
- connected_nodes: usize,
- storage_nodes: usize,
- storage_nodes_ok: usize,
- partitions: usize,
- partitions_quorum: usize,
- partitions_all_ok: usize,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetClusterStatusResponse {
- node: String,
- garage_version: &'static str,
- garage_features: Option<&'static [&'static str]>,
- rust_version: &'static str,
- db_engine: String,
- layout_version: u64,
- nodes: Vec<NodeResp>,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct ApplyClusterLayoutResponse {
- message: Vec<String>,
- layout: GetClusterLayoutResponse,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct ConnectClusterNodesResponse {
- success: bool,
- error: Option<String>,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetClusterLayoutResponse {
- version: u64,
- roles: Vec<NodeRoleResp>,
- staged_role_changes: Vec<NodeRoleChange>,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct NodeRoleResp {
- id: String,
- zone: String,
- capacity: Option<u64>,
- tags: Vec<String>,
-}
-
-#[derive(Serialize, Default)]
-#[serde(rename_all = "camelCase")]
-struct FreeSpaceResp {
- available: u64,
- total: u64,
-}
-
-#[derive(Serialize, Default)]
-#[serde(rename_all = "camelCase")]
-struct NodeResp {
- id: String,
- role: Option<NodeRoleResp>,
- addr: Option<SocketAddr>,
- hostname: Option<String>,
- is_up: bool,
- last_seen_secs_ago: Option<u64>,
- draining: bool,
- #[serde(skip_serializing_if = "Option::is_none")]
- data_partition: Option<FreeSpaceResp>,
- #[serde(skip_serializing_if = "Option::is_none")]
- metadata_partition: Option<FreeSpaceResp>,
-}
-
// ---- update functions ----
-pub async fn handle_update_cluster_layout(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
-
- let mut layout = garage.system.cluster_layout().inner().clone();
-
- let mut roles = layout.current().roles.clone();
- roles.merge(&layout.staging.get().roles);
-
- for change in updates {
- let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
- let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
-
- let new_role = match change.action {
- NodeRoleChangeEnum::Remove { remove: true } => None,
- NodeRoleChangeEnum::Update {
- zone,
- capacity,
- tags,
- } => Some(layout::NodeRole {
- zone,
- capacity,
- tags,
- }),
- _ => return Err(Error::bad_request("Invalid layout change")),
- };
-
- layout
- .staging
- .get_mut()
- .roles
- .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
- }
-
- garage
- .system
- .layout_manager
- .update_cluster_layout(&layout)
- .await?;
-
- let res = format_cluster_layout(&layout);
- Ok(json_ok_response(&res)?)
-}
-
-pub async fn handle_apply_cluster_layout(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?;
-
- let layout = garage.system.cluster_layout().inner().clone();
- let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
-
- garage
- .system
- .layout_manager
- .update_cluster_layout(&layout)
- .await?;
-
- let res = ApplyClusterLayoutResponse {
- message: msg,
- layout: format_cluster_layout(&layout),
- };
- Ok(json_ok_response(&res)?)
-}
-
-pub async fn handle_revert_cluster_layout(
- garage: &Arc<Garage>,
-) -> Result<Response<ResBody>, Error> {
- let layout = garage.system.cluster_layout().inner().clone();
- let layout = layout.revert_staged_changes()?;
- garage
- .system
- .layout_manager
- .update_cluster_layout(&layout)
- .await?;
-
- let res = format_cluster_layout(&layout);
- Ok(json_ok_response(&res)?)
-}
+impl RequestHandler for UpdateClusterLayoutRequest {
+ type Response = UpdateClusterLayoutResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateClusterLayoutResponse, Error> {
+ let mut layout = garage.system.cluster_layout().inner().clone();
+
+ let mut roles = layout.current().roles.clone();
+ roles.merge(&layout.staging.get().roles);
+
+ for change in self.0 {
+ let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
+ let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
+
+ let new_role = match change.action {
+ NodeRoleChangeEnum::Remove { remove: true } => None,
+ NodeRoleChangeEnum::Update {
+ zone,
+ capacity,
+ tags,
+ } => Some(layout::NodeRole {
+ zone,
+ capacity,
+ tags,
+ }),
+ _ => return Err(Error::bad_request("Invalid layout change")),
+ };
-// ----
+ layout
+ .staging
+ .get_mut()
+ .roles
+ .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
+ }
-type UpdateClusterLayoutRequest = Vec<NodeRoleChange>;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct ApplyLayoutRequest {
- version: u64,
+ let res = format_cluster_layout(&layout);
+ Ok(UpdateClusterLayoutResponse(res))
+ }
}
-// ----
-
-#[derive(Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct NodeRoleChange {
- id: String,
- #[serde(flatten)]
- action: NodeRoleChangeEnum,
+impl RequestHandler for ApplyClusterLayoutRequest {
+ type Response = ApplyClusterLayoutResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ApplyClusterLayoutResponse, Error> {
+ let layout = garage.system.cluster_layout().inner().clone();
+ let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
+
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
+
+ Ok(ApplyClusterLayoutResponse {
+ message: msg,
+ layout: format_cluster_layout(&layout),
+ })
+ }
}
-#[derive(Serialize, Deserialize)]
-#[serde(untagged)]
-enum NodeRoleChangeEnum {
- #[serde(rename_all = "camelCase")]
- Remove { remove: bool },
- #[serde(rename_all = "camelCase")]
- Update {
- zone: String,
- capacity: Option<u64>,
- tags: Vec<String>,
- },
+impl RequestHandler for RevertClusterLayoutRequest {
+ type Response = RevertClusterLayoutResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RevertClusterLayoutResponse, Error> {
+ let layout = garage.system.cluster_layout().inner().clone();
+ let layout = layout.revert_staged_changes()?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
+
+ let res = format_cluster_layout(&layout);
+ Ok(RevertClusterLayoutResponse(res))
+ }
}
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 201f9b40..d7ea7dc9 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -25,6 +25,14 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested block does not exist
+ #[error(display = "Block not found: {}", _0)]
+ NoSuchBlock(String),
+
+ /// The requested worker does not exist
+ #[error(display = "Worker not found: {}", _0)]
+ NoSuchWorker(u64),
+
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@@ -49,10 +57,12 @@ impl From<HelperError> for Error {
}
impl Error {
- fn code(&self) -> &'static str {
+ pub fn code(&self) -> &'static str {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::NoSuchWorker(_) => "NoSuchWorker",
+ Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -63,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
+ StatusCode::NOT_FOUND
+ }
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index bebf3063..dc6ae4e9 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -1,173 +1,168 @@
use std::collections::HashMap;
use std::sync::Arc;
-use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
-use serde::{Deserialize, Serialize};
-
use garage_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
-use garage_api_common::helpers::*;
-
-use crate::api_server::ResBody;
+use crate::api::*;
use crate::error::*;
-
-pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let res = garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?
- .iter()
- .map(|k| ListKeyResultItem {
- id: k.key_id.to_string(),
- name: k.params().unwrap().name.get().clone(),
- })
- .collect::<Vec<_>>();
-
- Ok(json_ok_response(&res)?)
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct ListKeyResultItem {
- id: String,
- name: String,
-}
-
-pub async fn handle_get_key_info(
- garage: &Arc<Garage>,
- id: Option<String>,
- search: Option<String>,
- show_secret_key: bool,
-) -> Result<Response<ResBody>, Error> {
- let key = if let Some(id) = id {
- garage.key_helper().get_existing_key(&id).await?
- } else if let Some(search) = search {
- garage
- .key_helper()
- .get_existing_matching_key(&search)
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for ListKeysRequest {
+ type Response = ListKeysResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
+ let res = garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
.await?
- } else {
- unreachable!();
- };
+ .iter()
+ .map(|k| ListKeysResponseItem {
+ id: k.key_id.to_string(),
+ name: k.params().unwrap().name.get().clone(),
+ })
+ .collect::<Vec<_>>();
- key_info_results(garage, key, show_secret_key).await
+ Ok(ListKeysResponse(res))
+ }
}
-pub async fn handle_create_key(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
-
- let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
- garage.key_table.insert(&key).await?;
+impl RequestHandler for GetKeyInfoRequest {
+ type Response = GetKeyInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetKeyInfoResponse, Error> {
+ let key = match (self.id, self.search) {
+ (Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
+ (None, Some(search)) => {
+ garage
+ .key_helper()
+ .get_existing_matching_key(&search)
+ .await?
+ }
+ _ => {
+ return Err(Error::bad_request(
+ "Either id or search must be provided (but not both)",
+ ));
+ }
+ };
- key_info_results(garage, key, true).await
+ Ok(key_info_results(garage, key, self.show_secret_key).await?)
+ }
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct CreateKeyRequest {
- name: Option<String>,
-}
+impl RequestHandler for CreateKeyRequest {
+ type Response = CreateKeyResponse;
-pub async fn handle_import_key(
- garage: &Arc<Garage>,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateKeyResponse, Error> {
+ let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
+ garage.key_table.insert(&key).await?;
- let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
- if prev_key.is_some() {
- return Err(Error::KeyAlreadyExists(req.access_key_id.to_string()));
+ Ok(CreateKeyResponse(
+ key_info_results(garage, key, true).await?,
+ ))
}
+}
- let imported_key = Key::import(
- &req.access_key_id,
- &req.secret_access_key,
- req.name.as_deref().unwrap_or("Imported key"),
- )
- .ok_or_bad_request("Invalid key format")?;
- garage.key_table.insert(&imported_key).await?;
+impl RequestHandler for ImportKeyRequest {
+ type Response = ImportKeyResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ImportKeyResponse, Error> {
+ let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
+ }
- key_info_results(garage, imported_key, false).await
-}
+ let imported_key = Key::import(
+ &self.access_key_id,
+ &self.secret_access_key,
+ self.name.as_deref().unwrap_or("Imported key"),
+ )
+ .ok_or_bad_request("Invalid key format")?;
+ garage.key_table.insert(&imported_key).await?;
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct ImportKeyRequest {
- access_key_id: String,
- secret_access_key: String,
- name: Option<String>,
+ Ok(ImportKeyResponse(
+ key_info_results(garage, imported_key, false).await?,
+ ))
+ }
}
-pub async fn handle_update_key(
- garage: &Arc<Garage>,
- id: String,
- req: Request<IncomingBody>,
-) -> Result<Response<ResBody>, Error> {
- let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?;
+impl RequestHandler for UpdateKeyRequest {
+ type Response = UpdateKeyResponse;
- let mut key = garage.key_helper().get_existing_key(&id).await?;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateKeyResponse, Error> {
+ let mut key = garage.key_helper().get_existing_key(&self.id).await?;
- let key_state = key.state.as_option_mut().unwrap();
+ let key_state = key.state.as_option_mut().unwrap();
- if let Some(new_name) = req.name {
- key_state.name.update(new_name);
- }
- if let Some(allow) = req.allow {
- if allow.create_bucket {
- key_state.allow_create_bucket.update(true);
+ if let Some(new_name) = self.body.name {
+ key_state.name.update(new_name);
}
- }
- if let Some(deny) = req.deny {
- if deny.create_bucket {
- key_state.allow_create_bucket.update(false);
+ if let Some(allow) = self.body.allow {
+ if allow.create_bucket {
+ key_state.allow_create_bucket.update(true);
+ }
+ }
+ if let Some(deny) = self.body.deny {
+ if deny.create_bucket {
+ key_state.allow_create_bucket.update(false);
+ }
}
- }
- garage.key_table.insert(&key).await?;
+ garage.key_table.insert(&key).await?;
- key_info_results(garage, key, false).await
+ Ok(UpdateKeyResponse(
+ key_info_results(garage, key, false).await?,
+ ))
+ }
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct UpdateKeyRequest {
- name: Option<String>,
- allow: Option<KeyPerm>,
- deny: Option<KeyPerm>,
-}
+impl RequestHandler for DeleteKeyRequest {
+ type Response = DeleteKeyResponse;
-pub async fn handle_delete_key(
- garage: &Arc<Garage>,
- id: String,
-) -> Result<Response<ResBody>, Error> {
- let helper = garage.locked_helper().await;
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteKeyResponse, Error> {
+ let helper = garage.locked_helper().await;
- let mut key = helper.key().get_existing_key(&id).await?;
+ let mut key = helper.key().get_existing_key(&self.id).await?;
- helper.delete_key(&mut key).await?;
+ helper.delete_key(&mut key).await?;
- Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(empty_body())?)
+ Ok(DeleteKeyResponse)
+ }
}
async fn key_info_results(
garage: &Arc<Garage>,
key: Key,
show_secret: bool,
-) -> Result<Response<ResBody>, Error> {
+) -> Result<GetKeyInfoResponse, Error> {
let mut relevant_buckets = HashMap::new();
let key_state = key.state.as_option().unwrap();
@@ -193,7 +188,7 @@ async fn key_info_results(
}
}
- let res = GetKeyInfoResult {
+ let res = GetKeyInfoResponse {
name: key_state.name.get().clone(),
access_key_id: key.key_id.clone(),
secret_access_key: if show_secret {
@@ -208,7 +203,7 @@ async fn key_info_results(
.into_values()
.map(|bucket| {
let state = bucket.state.as_option().unwrap();
- KeyInfoBucketResult {
+ KeyInfoBucketResponse {
id: hex::encode(bucket.id),
global_aliases: state
.aliases
@@ -238,43 +233,5 @@ async fn key_info_results(
.collect::<Vec<_>>(),
};
- Ok(json_ok_response(&res)?)
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct GetKeyInfoResult {
- name: String,
- access_key_id: String,
- #[serde(skip_serializing_if = "is_default")]
- secret_access_key: Option<String>,
- permissions: KeyPerm,
- buckets: Vec<KeyInfoBucketResult>,
-}
-
-#[derive(Serialize, Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct KeyPerm {
- #[serde(default)]
- create_bucket: bool,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct KeyInfoBucketResult {
- id: String,
- global_aliases: Vec<String>,
- local_aliases: Vec<String>,
- permissions: ApiBucketKeyPerm,
-}
-
-#[derive(Serialize, Deserialize, Default)]
-#[serde(rename_all = "camelCase")]
-pub(crate) struct ApiBucketKeyPerm {
- #[serde(default)]
- pub(crate) read: bool,
- #[serde(default)]
- pub(crate) write: bool,
- #[serde(default)]
- pub(crate) owner: bool,
+ Ok(res)
}
diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs
index 599e9b44..dd9b7ffd 100644
--- a/src/api/admin/lib.rs
+++ b/src/api/admin/lib.rs
@@ -3,9 +3,41 @@ extern crate tracing;
pub mod api_server;
mod error;
+mod macros;
+
+pub mod api;
mod router_v0;
mod router_v1;
+mod router_v2;
mod bucket;
mod cluster;
mod key;
+mod special;
+
+mod block;
+mod node;
+mod repair;
+mod worker;
+
+use std::sync::Arc;
+
+use garage_model::garage::Garage;
+
+pub use api_server::AdminApiServer as Admin;
+
+pub enum Authorization {
+ None,
+ MetricsToken,
+ AdminToken,
+}
+
+pub trait RequestHandler {
+ type Response;
+
+ fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send;
+}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
new file mode 100644
index 00000000..df2762fe
--- /dev/null
+++ b/src/api/admin/macros.rs
@@ -0,0 +1,219 @@
+macro_rules! admin_endpoints {
+ [
+ $(@special $special_endpoint:ident,)*
+ $($endpoint:ident,)*
+ ] => {
+ paste! {
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum AdminApiRequest {
+ $(
+ $special_endpoint( [<$special_endpoint Request>] ),
+ )*
+ $(
+ $endpoint( [<$endpoint Request>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize)]
+ #[serde(untagged)]
+ pub enum AdminApiResponse {
+ $(
+ $endpoint( [<$endpoint Response>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum TaggedAdminApiResponse {
+ $(
+ $endpoint( [<$endpoint Response>] ),
+ )*
+ }
+
+ impl AdminApiRequest {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(
+ Self::$special_endpoint(_) => stringify!($special_endpoint),
+ )*
+ $(
+ Self::$endpoint(_) => stringify!($endpoint),
+ )*
+ }
+ }
+ }
+
+ impl AdminApiResponse {
+ pub fn tagged(self) -> TaggedAdminApiResponse {
+ match self {
+ $(
+ Self::$endpoint(res) => TaggedAdminApiResponse::$endpoint(res),
+ )*
+ }
+ }
+ }
+
+ $(
+ impl From< [< $endpoint Request >] > for AdminApiRequest {
+ fn from(req: [< $endpoint Request >]) -> AdminApiRequest {
+ AdminApiRequest::$endpoint(req)
+ }
+ }
+
+ impl TryFrom<TaggedAdminApiResponse> for [< $endpoint Response >] {
+ type Error = TaggedAdminApiResponse;
+ fn try_from(resp: TaggedAdminApiResponse) -> Result< [< $endpoint Response >], TaggedAdminApiResponse> {
+ match resp {
+ TaggedAdminApiResponse::$endpoint(v) => Ok(v),
+ x => Err(x),
+ }
+ }
+ }
+ )*
+
+ impl RequestHandler for AdminApiRequest {
+ type Response = AdminApiResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
+ Ok(match self {
+ $(
+ AdminApiRequest::$special_endpoint(_) => panic!(
+ concat!(stringify!($special_endpoint), " needs to go through a special handler")
+ ),
+ )*
+ $(
+ AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
+ )*
+ })
+ }
+ }
+ }
+ };
+}
+
+macro_rules! local_admin_endpoints {
+ [
+ $($endpoint:ident,)*
+ ] => {
+ paste! {
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiRequest {
+ $(
+ $endpoint( [<Local $endpoint Request>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiResponse {
+ $(
+ $endpoint( [<Local $endpoint Response>] ),
+ )*
+ }
+
+ $(
+ pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
+
+ pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
+
+ pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
+
+ impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
+ fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
+ LocalAdminApiRequest::$endpoint(req)
+ }
+ }
+
+ impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
+ type Error = LocalAdminApiResponse;
+ fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
+ match resp {
+ LocalAdminApiResponse::$endpoint(v) => Ok(v),
+ x => Err(x),
+ }
+ }
+ }
+
+ impl RequestHandler for [< $endpoint Request >] {
+ type Response = [< $endpoint Response >];
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
+ let to = match self.node.as_str() {
+ "*" => garage.system.cluster_layout().all_nodes().to_vec(),
+ id => {
+ let nodes = garage.system.cluster_layout().all_nodes()
+ .iter()
+ .filter(|x| hex::encode(x).starts_with(id))
+ .cloned()
+ .collect::<Vec<_>>();
+ if nodes.len() != 1 {
+ return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
+ }
+ nodes
+ }
+ };
+
+ let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
+ &to,
+ AdminRpc::Internal(self.body.into()),
+ RequestStrategy::with_priority(PRIO_NORMAL),
+ ).await?;
+
+ let mut ret = [< $endpoint Response >] {
+ success: HashMap::new(),
+ error: HashMap::new(),
+ };
+ for (node, resp) in resps {
+ match resp {
+ Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
+ match [< Local $endpoint Response >]::try_from(r) {
+ Ok(r) => {
+ ret.success.insert(hex::encode(node), r);
+ }
+ Err(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ }
+ }
+ Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
+ ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
+ }
+ Ok(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ Err(e) => {
+ ret.error.insert(hex::encode(node), e.to_string());
+ }
+ }
+ }
+
+ Ok(ret)
+ }
+ }
+ )*
+
+ impl LocalAdminApiRequest {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(
+ Self::$endpoint(_) => stringify!($endpoint),
+ )*
+ }
+ }
+ }
+
+ impl RequestHandler for LocalAdminApiRequest {
+ type Response = LocalAdminApiResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
+ Ok(match self {
+ $(
+ LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
+ )*
+ })
+ }
+ }
+ }
+ };
+}
+
+pub(crate) use admin_endpoints;
+pub(crate) use local_admin_endpoints;
diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs
new file mode 100644
index 00000000..f6f43d95
--- /dev/null
+++ b/src/api/admin/node.rs
@@ -0,0 +1,216 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use format_table::format_table_to_string;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::layout::PARTITION_BITS;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalCreateMetadataSnapshotRequest {
+ type Response = LocalCreateMetadataSnapshotResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalCreateMetadataSnapshotResponse, Error> {
+ garage_model::snapshot::async_snapshot_metadata(garage).await?;
+ Ok(LocalCreateMetadataSnapshotResponse)
+ }
+}
+
+impl RequestHandler for LocalGetNodeStatisticsRequest {
+ type Response = LocalGetNodeStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetNodeStatisticsResponse, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "Garage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(gather_table_stats(&garage.bucket_table)?);
+ table.push(gather_table_stats(&garage.key_table)?);
+ table.push(gather_table_stats(&garage.object_table)?);
+ table.push(gather_table_stats(&garage.version_table)?);
+ table.push(gather_table_stats(&garage.block_ref_table)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = garage.block_manager.rc_len()?.to_string();
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ Ok(LocalGetNodeStatisticsResponse { freeform: ret })
+ }
+}
+
+impl RequestHandler for GetClusterStatisticsRequest {
+ type Response = GetClusterStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatisticsResponse, Error> {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics for current nodes
+ let layout = &garage.system.cluster_layout();
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.current().ring_assignment_data.iter() {
+ let id = layout.current().node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ Ok(GetClusterStatisticsResponse { freeform: ret })
+ }
+}
+
+fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
+ let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+}
diff --git a/src/api/admin/repair.rs b/src/api/admin/repair.rs
new file mode 100644
index 00000000..a9b8c36a
--- /dev/null
+++ b/src/api/admin/repair.rs
@@ -0,0 +1,403 @@
+use std::future::Future;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::{Error as GarageError, OkOrMessage};
+use garage_util::migrate::Migrate;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_block::manager::BlockManager;
+use garage_block::repair::ScrubWorkerCommand;
+
+use garage_model::garage::Garage;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::mpu_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+const RC_REPAIR_ITER_COUNT: usize = 64;
+
+impl RequestHandler for LocalLaunchRepairOperationRequest {
+ type Response = LocalLaunchRepairOperationResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalLaunchRepairOperationResponse, Error> {
+ let bg = &admin.background;
+ match self.repair_type {
+ RepairType::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
+ }
+ RepairType::Versions => {
+ info!("Repairing the versions table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
+ }
+ RepairType::MultipartUploads => {
+ info!("Repairing the multipart uploads table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
+ }
+ RepairType::BlockRefs => {
+ info!("Repairing the block refs table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
+ }
+ RepairType::BlockRc => {
+ info!("Repairing the block reference counters");
+ bg.spawn_worker(BlockRcRepair::new(
+ garage.block_manager.clone(),
+ garage.block_ref_table.clone(),
+ ));
+ }
+ RepairType::Blocks => {
+ info!("Repairing the stored blocks");
+ bg.spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairType::Scrub(cmd) => {
+ let cmd = match cmd {
+ ScrubCommand::Start => ScrubWorkerCommand::Start,
+ ScrubCommand::Pause => {
+ ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24))
+ }
+ ScrubCommand::Resume => ScrubWorkerCommand::Resume,
+ ScrubCommand::Cancel => ScrubWorkerCommand::Cancel,
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await?;
+ }
+ RepairType::Rebalance => {
+ info!("Rebalancing the stored blocks among storage locations");
+ bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ }
+ Ok(LocalLaunchRepairOperationResponse)
+ }
+}
+
+// ----
+
+trait TableRepair: Send + Sync + 'static {
+ type T: TableSchema;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
+
+ fn process(
+ &mut self,
+ garage: &Garage,
+ entry: <<Self as TableRepair>::T as TableSchema>::E,
+ ) -> impl Future<Output = Result<bool, GarageError>> + Send;
+}
+
+struct TableRepairWorker<T: TableRepair> {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+ repairs: usize,
+ inner: T,
+}
+
+impl<R: TableRepair> TableRepairWorker<R> {
+ fn new(garage: Arc<Garage>, inner: R) -> Self {
+ Self {
+ garage,
+ inner,
+ pos: vec![],
+ counter: 0,
+ repairs: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl<R: TableRepair> Worker for TableRepairWorker<R> {
+ fn name(&self) -> String {
+ format!("{} repair worker", R::T::TABLE_NAME)
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
+ ..Default::default()
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
+ let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
+ None => {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ let entry = <R::T as TableSchema>::E::decode(&item_bytes)
+ .ok_or_message("Cannot decode table entry")?;
+ if self.inner.process(&self.garage, entry).await? {
+ self.repairs += 1;
+ }
+
+ self.counter += 1;
+ self.pos = next_pos;
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
+}
+
+// ----
+
+struct RepairVersions;
+
+impl TableRepair for RepairVersions {
+ type T = VersionTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.version_table
+ }
+
+ async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> {
+ if !version.deleted.get() {
+ let ref_exists = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => garage
+ .object_table
+ .get(bucket_id, key)
+ .await?
+ .map(|o| {
+ o.versions().iter().any(|x| {
+ x.uuid == version.uuid && x.state != ObjectVersionState::Aborted
+ })
+ })
+ .unwrap_or(false),
+ VersionBacklink::MultipartUpload { upload_id } => garage
+ .mpu_table
+ .get(upload_id, &EmptyKey)
+ .await?
+ .map(|u| !u.deleted.get())
+ .unwrap_or(false),
+ };
+
+ if !ref_exists {
+ info!("Repair versions: marking version as deleted: {:?}", version);
+ garage
+ .version_table
+ .insert(&Version::new(version.uuid, version.backlink, true))
+ .await?;
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+ }
+}
+
+// ----
+
+struct RepairBlockRefs;
+
+impl TableRepair for RepairBlockRefs {
+ type T = BlockRefTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.block_ref_table
+ }
+
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut block_ref: BlockRef,
+ ) -> Result<bool, GarageError> {
+ if !block_ref.deleted.get() {
+ let ref_exists = garage
+ .version_table
+ .get(&block_ref.version, &EmptyKey)
+ .await?
+ .map(|v| !v.deleted.get())
+ .unwrap_or(false);
+
+ if !ref_exists {
+ info!(
+ "Repair block ref: marking block_ref as deleted: {:?}",
+ block_ref
+ );
+ block_ref.deleted.set();
+ garage.block_ref_table.insert(&block_ref).await?;
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+ }
+}
+
+// ----
+
+struct RepairMpu;
+
+impl TableRepair for RepairMpu {
+ type T = MultipartUploadTable;
+
+ fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
+ &garage.mpu_table
+ }
+
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut mpu: MultipartUpload,
+ ) -> Result<bool, GarageError> {
+ if !mpu.deleted.get() {
+ let ref_exists = garage
+ .object_table
+ .get(&mpu.bucket_id, &mpu.key)
+ .await?
+ .map(|o| {
+ o.versions()
+ .iter()
+ .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true)))
+ })
+ .unwrap_or(false);
+
+ if !ref_exists {
+ info!(
+ "Repair multipart uploads: marking mpu as deleted: {:?}",
+ mpu
+ );
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+ }
+}
+
+// ===== block reference counter repair =====
+
+pub struct BlockRcRepair {
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ cursor: Hash,
+ counter: u64,
+ repairs: u64,
+}
+
+impl BlockRcRepair {
+ fn new(
+ block_manager: Arc<BlockManager>,
+ block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+ ) -> Self {
+ Self {
+ block_manager,
+ block_ref_table,
+ cursor: [0u8; 32].into(),
+ counter: 0,
+ repairs: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for BlockRcRepair {
+ fn name(&self) -> String {
+ format!("Block refcount repair worker")
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ progress: Some(format!("{} ({})", self.counter, self.repairs)),
+ ..Default::default()
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
+ for _i in 0..RC_REPAIR_ITER_COUNT {
+ let next1 = self
+ .block_manager
+ .rc
+ .rc_table
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
+ let next2 = self
+ .block_ref_table
+ .data
+ .store
+ .range(self.cursor.as_slice()..)?
+ .next()
+ .transpose()?
+ .map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
+ let next = match (next1, next2) {
+ (Some(k1), Some(k2)) => std::cmp::min(k1, k2),
+ (Some(k), None) | (None, Some(k)) => k,
+ (None, None) => {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ };
+
+ if self.block_manager.rc.recalculate_rc(&next)?.1 {
+ self.repairs += 1;
+ }
+ self.counter += 1;
+ if let Some(next_incr) = next.increment() {
+ self.cursor = next_incr;
+ } else {
+ info!(
+ "{}: finished, done {}, fixed {}",
+ self.name(),
+ self.counter,
+ self.repairs
+ );
+ return Ok(WorkerState::Done);
+ }
+ }
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ unreachable!()
+ }
+}
diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs
index 0b4901ea..138a801d 100644
--- a/src/api/admin/router_v1.rs
+++ b/src/api/admin/router_v1.rs
@@ -7,12 +7,6 @@ use garage_api_common::router_macros::*;
use crate::error::*;
use crate::router_v0;
-pub enum Authorization {
- None,
- MetricsToken,
- AdminToken,
-}
-
router_match! {@func
/// List of all Admin API endpoints.
@@ -211,15 +205,6 @@ impl Endpoint {
))),
}
}
- /// Get the kind of authorization which is required to perform the operation.
- pub fn authorization_type(&self) -> Authorization {
- match self {
- Self::Health => Authorization::None,
- Self::CheckDomain => Authorization::None,
- Self::Metrics => Authorization::MetricsToken,
- _ => Authorization::AdminToken,
- }
- }
}
generateQueryParameters! {
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
new file mode 100644
index 00000000..4d5c015e
--- /dev/null
+++ b/src/api/admin/router_v2.rs
@@ -0,0 +1,268 @@
+use std::borrow::Cow;
+
+use hyper::body::Incoming as IncomingBody;
+use hyper::{Method, Request};
+use paste::paste;
+
+use garage_api_common::helpers::*;
+use garage_api_common::router_macros::*;
+
+use crate::api::*;
+use crate::error::*;
+use crate::router_v1;
+use crate::Authorization;
+
+impl AdminApiRequest {
+ /// Determine which S3 endpoint a request is for using the request, and a bucket which was
+ /// possibly extracted from the Host header.
+ /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
+ pub async fn from_request(req: Request<IncomingBody>) -> Result<Self, Error> {
+ let uri = req.uri().clone();
+ let path = uri.path();
+ let query = uri.query();
+
+ let method = req.method().clone();
+
+ let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
+
+ let res = router_match!(@gen_path_parser_v2 (&method, path, "/v2/", query, req) [
+ @special OPTIONS _ => Options (),
+ @special GET "/check" => CheckDomain (query::domain),
+ @special GET "/health" => Health (),
+ @special GET "/metrics" => Metrics (),
+ // Cluster endpoints
+ GET GetClusterStatus (),
+ GET GetClusterHealth (),
+ POST ConnectClusterNodes (body),
+ // Layout endpoints
+ GET GetClusterLayout (),
+ POST UpdateClusterLayout (body),
+ POST ApplyClusterLayout (body),
+ POST RevertClusterLayout (),
+ // API key endpoints
+ GET GetKeyInfo (query_opt::id, query_opt::search, parse_default(false)::show_secret_key),
+ POST UpdateKey (body_field, query::id),
+ POST CreateKey (body),
+ POST ImportKey (body),
+ POST DeleteKey (query::id),
+ GET ListKeys (),
+ // Bucket endpoints
+ GET GetBucketInfo (query_opt::id, query_opt::global_alias, query_opt::search),
+ GET ListBuckets (),
+ POST CreateBucket (body),
+ POST DeleteBucket (query::id),
+ POST UpdateBucket (body_field, query::id),
+ POST CleanupIncompleteUploads (body),
+ // Bucket-key permissions
+ POST AllowBucketKey (body),
+ POST DenyBucketKey (body),
+ // Bucket aliases
+ POST AddBucketAlias (body),
+ POST RemoveBucketAlias (body),
+ // Node APIs
+ POST CreateMetadataSnapshot (default::body, query::node),
+ GET GetNodeStatistics (default::body, query::node),
+ GET GetClusterStatistics (),
+ POST LaunchRepairOperation (body_field, query::node),
+ // Worker APIs
+ POST ListWorkers (body_field, query::node),
+ POST GetWorkerInfo (body_field, query::node),
+ POST GetWorkerVariable (body_field, query::node),
+ POST SetWorkerVariable (body_field, query::node),
+ // Block APIs
+ GET ListBlockErrors (default::body, query::node),
+ POST GetBlockInfo (body_field, query::node),
+ POST RetryBlockResync (body_field, query::node),
+ POST PurgeBlocks (body_field, query::node),
+ ]);
+
+ if let Some(message) = query.nonempty_message() {
+ debug!("Unused query parameter: {}", message)
+ }
+
+ Ok(res)
+ }
+
+ /// Some endpoints work exactly the same in their v2/ version as they did in their v1/ version.
+ /// For these endpoints, we can convert a v1/ call to its equivalent as if it was made using
+ /// its v2/ URL.
+ pub async fn from_v1(
+ v1_endpoint: router_v1::Endpoint,
+ req: Request<IncomingBody>,
+ ) -> Result<Self, Error> {
+ use router_v1::Endpoint;
+
+ match v1_endpoint {
+ Endpoint::GetClusterStatus => {
+ Ok(AdminApiRequest::GetClusterStatus(GetClusterStatusRequest))
+ }
+ Endpoint::GetClusterHealth => {
+ Ok(AdminApiRequest::GetClusterHealth(GetClusterHealthRequest))
+ }
+ Endpoint::ConnectClusterNodes => {
+ let req = parse_json_body::<ConnectClusterNodesRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::ConnectClusterNodes(req))
+ }
+
+ // Layout
+ Endpoint::GetClusterLayout => {
+ Ok(AdminApiRequest::GetClusterLayout(GetClusterLayoutRequest))
+ }
+ Endpoint::UpdateClusterLayout => {
+ let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::UpdateClusterLayout(updates))
+ }
+ Endpoint::ApplyClusterLayout => {
+ let param = parse_json_body::<ApplyClusterLayoutRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::ApplyClusterLayout(param))
+ }
+ Endpoint::RevertClusterLayout => Ok(AdminApiRequest::RevertClusterLayout(
+ RevertClusterLayoutRequest,
+ )),
+
+ // Keys
+ Endpoint::ListKeys => Ok(AdminApiRequest::ListKeys(ListKeysRequest)),
+ Endpoint::GetKeyInfo {
+ id,
+ search,
+ show_secret_key,
+ } => {
+ let show_secret_key = show_secret_key.map(|x| x == "true").unwrap_or(false);
+ Ok(AdminApiRequest::GetKeyInfo(GetKeyInfoRequest {
+ id,
+ search,
+ show_secret_key,
+ }))
+ }
+ Endpoint::CreateKey => {
+ let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::CreateKey(req))
+ }
+ Endpoint::ImportKey => {
+ let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::ImportKey(req))
+ }
+ Endpoint::UpdateKey { id } => {
+ let body = parse_json_body::<UpdateKeyRequestBody, _, Error>(req).await?;
+ Ok(AdminApiRequest::UpdateKey(UpdateKeyRequest { id, body }))
+ }
+
+ // DeleteKey semantics changed:
+ // - in v1/ : HTTP DELETE => HTTP 204 No Content
+ // - in v2/ : HTTP POST => HTTP 200 Ok
+ // Endpoint::DeleteKey { id } => Ok(AdminApiRequest::DeleteKey(DeleteKeyRequest { id })),
+
+ // Buckets
+ Endpoint::ListBuckets => Ok(AdminApiRequest::ListBuckets(ListBucketsRequest)),
+ Endpoint::GetBucketInfo { id, global_alias } => {
+ Ok(AdminApiRequest::GetBucketInfo(GetBucketInfoRequest {
+ id,
+ global_alias,
+ search: None,
+ }))
+ }
+ Endpoint::CreateBucket => {
+ let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::CreateBucket(req))
+ }
+
+ // DeleteBucket semantics changed::
+ // - in v1/ : HTTP DELETE => HTTP 204 No Content
+ // - in v2/ : HTTP POST => HTTP 200 Ok
+ // Endpoint::DeleteBucket { id } => {
+ // Ok(AdminApiRequest::DeleteBucket(DeleteBucketRequest { id }))
+ // }
+ Endpoint::UpdateBucket { id } => {
+ let body = parse_json_body::<UpdateBucketRequestBody, _, Error>(req).await?;
+ Ok(AdminApiRequest::UpdateBucket(UpdateBucketRequest {
+ id,
+ body,
+ }))
+ }
+
+ // Bucket-key permissions
+ Endpoint::BucketAllowKey => {
+ let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::AllowBucketKey(AllowBucketKeyRequest(req)))
+ }
+ Endpoint::BucketDenyKey => {
+ let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
+ Ok(AdminApiRequest::DenyBucketKey(DenyBucketKeyRequest(req)))
+ }
+ // Bucket aliasing
+ Endpoint::GlobalAliasBucket { id, alias } => {
+ Ok(AdminApiRequest::AddBucketAlias(AddBucketAliasRequest {
+ bucket_id: id,
+ alias: BucketAliasEnum::Global {
+ global_alias: alias,
+ },
+ }))
+ }
+ Endpoint::GlobalUnaliasBucket { id, alias } => Ok(AdminApiRequest::RemoveBucketAlias(
+ RemoveBucketAliasRequest {
+ bucket_id: id,
+ alias: BucketAliasEnum::Global {
+ global_alias: alias,
+ },
+ },
+ )),
+ Endpoint::LocalAliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => Ok(AdminApiRequest::AddBucketAlias(AddBucketAliasRequest {
+ bucket_id: id,
+ alias: BucketAliasEnum::Local {
+ local_alias: alias,
+ access_key_id,
+ },
+ })),
+ Endpoint::LocalUnaliasBucket {
+ id,
+ access_key_id,
+ alias,
+ } => Ok(AdminApiRequest::RemoveBucketAlias(
+ RemoveBucketAliasRequest {
+ bucket_id: id,
+ alias: BucketAliasEnum::Local {
+ local_alias: alias,
+ access_key_id,
+ },
+ },
+ )),
+
+ // For endpoints that have different body content syntax, issue
+ // deprecation warning
+ _ => Err(Error::bad_request(format!(
+ "v1/ endpoint is no longer supported: {}",
+ v1_endpoint.name()
+ ))),
+ }
+ }
+
+ /// Get the kind of authorization which is required to perform the operation.
+ pub fn authorization_type(&self) -> Authorization {
+ match self {
+ Self::Options(_) => Authorization::None,
+ Self::Health(_) => Authorization::None,
+ Self::CheckDomain(_) => Authorization::None,
+ Self::Metrics(_) => Authorization::MetricsToken,
+ _ => Authorization::AdminToken,
+ }
+ }
+}
+
+generateQueryParameters! {
+ keywords: [],
+ fields: [
+ "node" => node,
+ "domain" => domain,
+ "format" => format,
+ "id" => id,
+ "search" => search,
+ "globalAlias" => global_alias,
+ "alias" => alias,
+ "accessKeyId" => access_key_id,
+ "showSecretKey" => show_secret_key
+ ]
+}
diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs
new file mode 100644
index 00000000..0ecf82bc
--- /dev/null
+++ b/src/api/admin/special.rs
@@ -0,0 +1,179 @@
+use std::sync::Arc;
+
+use http::header::{
+ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW,
+};
+use hyper::{Response, StatusCode};
+
+#[cfg(feature = "metrics")]
+use prometheus::{Encoder, TextEncoder};
+
+use garage_model::garage::Garage;
+use garage_rpc::system::ClusterHealthStatus;
+
+use garage_api_common::helpers::*;
+
+use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest};
+use crate::api_server::ResBody;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for OptionsRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(ALLOW, "OPTIONS,GET,POST")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS,GET,POST")
+ .header(ACCESS_CONTROL_ALLOW_HEADERS, "authorization,content-type")
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .body(empty_body())?)
+ }
+}
+
+impl RequestHandler for MetricsRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ #[cfg(feature = "metrics")]
+ {
+ use opentelemetry::trace::Tracer;
+
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ admin.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, encoder.format_type())
+ .body(bytes_body(buffer.into()))?)
+ }
+ #[cfg(not(feature = "metrics"))]
+ Err(Error::bad_request(
+ "Garage was built without the metrics feature".to_string(),
+ ))
+ }
+}
+
+impl RequestHandler for HealthRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ let health = garage.system.health();
+
+ let (status, status_str) = match health.status {
+ ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
+ ClusterHealthStatus::Degraded => (
+ StatusCode::OK,
+ "Garage is operational but some storage nodes are unavailable",
+ ),
+ ClusterHealthStatus::Unavailable => (
+ StatusCode::SERVICE_UNAVAILABLE,
+ "Quorum is not available for some/all partitions, reads and writes will fail",
+ ),
+ };
+ let status_str = format!(
+ "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
+ status_str
+ );
+
+ Ok(Response::builder()
+ .status(status)
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(string_body(status_str))?)
+ }
+}
+
+impl RequestHandler for CheckDomainRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ if check_domain(garage, &self.domain).await? {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(string_body(format!(
+ "Domain '{}' is managed by Garage",
+ self.domain
+ )))?)
+ } else {
+ Err(Error::bad_request(format!(
+ "Domain '{}' is not managed by Garage",
+ self.domain
+ )))
+ }
+ }
+}
+
+async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error> {
+ // Resolve bucket from domain name, inferring if the website must be activated for the
+ // domain to be valid.
+ let (bucket_name, must_check_website) = if let Some(bname) = garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|rd| host_to_bucket(domain, rd))
+ {
+ (bname.to_string(), false)
+ } else if let Some(bname) = garage
+ .config
+ .s3_web
+ .as_ref()
+ .and_then(|sw| host_to_bucket(domain, sw.root_domain.as_str()))
+ {
+ (bname.to_string(), true)
+ } else {
+ (domain.to_string(), true)
+ };
+
+ let bucket_id = match garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&bucket_name)
+ .await?
+ {
+ Some(bucket_id) => bucket_id,
+ None => return Ok(false),
+ };
+
+ if !must_check_website {
+ return Ok(true);
+ }
+
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let bucket_state = bucket.state.as_option().unwrap();
+ let bucket_website_config = bucket_state.website_config.get();
+
+ match bucket_website_config {
+ Some(_v) => Ok(true),
+ None => Ok(false),
+ }
+}
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
new file mode 100644
index 00000000..b3f4537b
--- /dev/null
+++ b/src/api/admin/worker.rs
@@ -0,0 +1,118 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use garage_util::background::*;
+use garage_util::time::now_msec;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListWorkersRequest {
+ type Response = LocalListWorkersResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalListWorkersResponse, Error> {
+ let workers = admin.background.get_worker_info();
+ let info = workers
+ .into_iter()
+ .filter(|(_, w)| {
+ (!self.busy_only
+ || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
+ && (!self.error_only || w.errors > 0)
+ })
+ .map(|(id, w)| worker_info_to_api(id as u64, w))
+ .collect::<Vec<_>>();
+ Ok(LocalListWorkersResponse(info))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerInfoRequest {
+ type Response = LocalGetWorkerInfoResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalGetWorkerInfoResponse, Error> {
+ let info = admin
+ .background
+ .get_worker_info()
+ .get(&(self.id as usize))
+ .ok_or(Error::NoSuchWorker(self.id))?
+ .clone();
+ Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
+ self.id, info,
+ )))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerVariableRequest {
+ type Response = LocalGetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetWorkerVariableResponse, Error> {
+ let mut res = HashMap::new();
+ if let Some(k) = self.variable {
+ res.insert(k.clone(), garage.bg_vars.get(&k)?);
+ } else {
+ let vars = garage.bg_vars.get_all();
+ for (k, v) in vars.iter() {
+ res.insert(k.to_string(), v.to_string());
+ }
+ }
+ Ok(LocalGetWorkerVariableResponse(res))
+ }
+}
+
+impl RequestHandler for LocalSetWorkerVariableRequest {
+ type Response = LocalSetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalSetWorkerVariableResponse, Error> {
+ garage.bg_vars.set(&self.variable, &self.value)?;
+
+ Ok(LocalSetWorkerVariableResponse {
+ variable: self.variable,
+ value: self.value,
+ })
+ }
+}
+
+// ---- helper functions ----
+
+fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
+ WorkerInfoResp {
+ id,
+ name: info.name,
+ state: match info.state {
+ WorkerState::Busy => WorkerStateResp::Busy,
+ WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
+ WorkerState::Idle => WorkerStateResp::Idle,
+ WorkerState::Done => WorkerStateResp::Done,
+ },
+ errors: info.errors as u64,
+ consecutive_errors: info.consecutive_errors as u64,
+ last_error: info.last_error.map(|(message, t)| WorkerLastError {
+ message,
+ secs_ago: now_msec().saturating_sub(t) / 1000,
+ }),
+
+ tranquility: info.status.tranquility,
+ progress: info.status.progress,
+ queue_length: info.status.queue_length,
+ persistent_errors: info.status.persistent_errors,
+ freeform: info.status.freeform,
+ }
+}
diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs
index 6ddc2ff2..d7ee5692 100644
--- a/src/api/common/generic_server.rs
+++ b/src/api/common/generic_server.rs
@@ -1,3 +1,4 @@
+use std::borrow::Cow;
use std::convert::Infallible;
use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt;
@@ -35,7 +36,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::helpers::{BoxBody, ErrorBody};
pub trait ApiEndpoint: Send + Sync + 'static {
- fn name(&self) -> &'static str;
+ fn name(&self) -> Cow<'static, str>;
fn add_span_attributes(&self, span: SpanRef<'_>);
}
diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs
index d9fe86db..f4a93c67 100644
--- a/src/api/common/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -45,6 +45,68 @@ macro_rules! router_match {
}
}
}};
+ (@gen_path_parser_v2 ($method:expr, $reqpath:expr, $pathprefix:literal, $query:expr, $req:expr)
+ [
+ $(@special $spec_meth:ident $spec_path:pat => $spec_api:ident $spec_params:tt,)*
+ $($meth:ident $api:ident $params:tt,)*
+ ]) => {{
+ {
+ #[allow(unused_parens)]
+ match ($method, $reqpath) {
+ $(
+ (&Method::$spec_meth, $spec_path) => AdminApiRequest::$spec_api (
+ router_match!(@@gen_parse_request $spec_api, $spec_params, $query, $req)
+ ),
+ )*
+ $(
+ (&Method::$meth, concat!($pathprefix, stringify!($api)))
+ => AdminApiRequest::$api (
+ router_match!(@@gen_parse_request $api, $params, $query, $req)
+ ),
+ )*
+ (m, p) => {
+ return Err(Error::bad_request(format!(
+ "Unknown API endpoint: {} {}",
+ m, p
+ )))
+ }
+ }
+ }
+ }};
+ (@@gen_parse_request $api:ident, (), $query: expr, $req:expr) => {{
+ paste!(
+ [< $api Request >]
+ )
+ }};
+ (@@gen_parse_request $api:ident, (body), $query: expr, $req:expr) => {{
+ paste!({
+ parse_json_body::< [<$api Request>], _, Error>($req).await?
+ })
+ }};
+ (@@gen_parse_request $api:ident, (body_field, $($conv:ident $(($conv_arg:expr))? :: $param:ident),*), $query: expr, $req:expr)
+ =>
+ {{
+ paste!({
+ let body = parse_json_body::< [<$api RequestBody>], _, Error>($req).await?;
+ [< $api Request >] {
+ body,
+ $(
+ $param: router_match!(@@parse_param $query, $conv $(($conv_arg))?, $param),
+ )+
+ }
+ })
+ }};
+ (@@gen_parse_request $api:ident, ($($conv:ident $(($conv_arg:expr))? :: $param:ident),*), $query: expr, $req:expr)
+ =>
+ {{
+ paste!({
+ [< $api Request >] {
+ $(
+ $param: router_match!(@@parse_param $query, $conv $(($conv_arg))?, $param),
+ )+
+ }
+ })
+ }};
(@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr),
key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
@@ -79,13 +141,19 @@ macro_rules! router_match {
}
}};
+ (@@parse_param $query:expr, default, $param:ident) => {{
+ Default::default()
+ }};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())
}};
(@@parse_param $query:expr, query, $param:ident) => {{
// extract mendatory query parameter
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned()
+ $query.$param.take()
+ .ok_or_bad_request(
+ format!("Missing argument `{}` for endpoint", stringify!($param))
+ )?.into_owned()
}};
(@@parse_param $query:expr, opt_parse, $param:ident) => {{
// extract and parse optional query parameter
@@ -99,10 +167,22 @@ macro_rules! router_match {
(@@parse_param $query:expr, parse, $param:ident) => {{
// extract and parse mandatory query parameter
// both missing and un-parseable parameters are reported as errors
- $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?
+ $query.$param.take()
+ .ok_or_bad_request(
+ format!("Missing argument `{}` for endpoint", stringify!($param))
+ )?
.parse()
.map_err(|_| Error::bad_request("Failed to parse query parameter"))?
}};
+ (@@parse_param $query:expr, parse_default($default:expr), $param:ident) => {{
+ // extract and parse optional query parameter
+ // using provided value as default if paramter is missing
+ $query.$param.take().map(|x| x
+ .parse()
+ .map_err(|_| Error::bad_request("Failed to parse query parameter")))
+ .transpose()?
+ .unwrap_or($default)
+ }};
(@func
$(#[$doc:meta])*
pub enum Endpoint {
@@ -187,6 +267,7 @@ macro_rules! generateQueryParameters {
},
)*
$(
+ // FIXME: remove if !v.is_empty() ?
$f_param => if !v.is_empty() {
if res.$f_name.replace(v).is_some() {
return Err(Error::bad_request(format!(
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index eb276f5b..015fd687 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -1,3 +1,4 @@
+use std::borrow::Cow;
use std::sync::Arc;
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
@@ -177,8 +178,8 @@ impl ApiHandler for K2VApiServer {
}
impl ApiEndpoint for K2VApiEndpoint {
- fn name(&self) -> &'static str {
- self.endpoint.name()
+ fn name(&self) -> Cow<'static, str> {
+ Cow::Borrowed(self.endpoint.name())
}
fn add_span_attributes(&self, span: SpanRef<'_>) {
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index bf48bba1..c8c28f3d 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -1,3 +1,4 @@
+use std::borrow::Cow;
use std::sync::Arc;
use hyper::header;
@@ -353,8 +354,8 @@ impl ApiHandler for S3ApiServer {
}
impl ApiEndpoint for S3ApiEndpoint {
- fn name(&self) -> &'static str {
- self.endpoint.name()
+ fn name(&self) -> Cow<'static, str> {
+ Cow::Borrowed(self.endpoint.name())
}
fn add_span_attributes(&self, span: SpanRef<'_>) {