diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/admin/Cargo.toml | 4 | ||||
-rw-r--r-- | src/api/admin/api.rs | 876 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 393 | ||||
-rw-r--r-- | src/api/admin/block.rs | 274 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 769 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 557 | ||||
-rw-r--r-- | src/api/admin/error.rs | 16 | ||||
-rw-r--r-- | src/api/admin/key.rs | 293 | ||||
-rw-r--r-- | src/api/admin/lib.rs | 32 | ||||
-rw-r--r-- | src/api/admin/macros.rs | 219 | ||||
-rw-r--r-- | src/api/admin/node.rs | 216 | ||||
-rw-r--r-- | src/api/admin/repair.rs | 403 | ||||
-rw-r--r-- | src/api/admin/router_v1.rs | 15 | ||||
-rw-r--r-- | src/api/admin/router_v2.rs | 268 | ||||
-rw-r--r-- | src/api/admin/special.rs | 179 | ||||
-rw-r--r-- | src/api/admin/worker.rs | 118 | ||||
-rw-r--r-- | src/api/common/generic_server.rs | 3 | ||||
-rw-r--r-- | src/api/common/router_macros.rs | 85 | ||||
-rw-r--r-- | src/api/k2v/api_server.rs | 5 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 5 |
20 files changed, 3581 insertions, 1149 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index adddf306..9ac099e8 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -14,7 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_model.workspace = true +garage_block.workspace = true garage_table.workspace = true garage_util.workspace = true garage_rpc.workspace = true @@ -22,8 +24,10 @@ garage_api_common.workspace = true argon2.workspace = true async-trait.workspace = true +bytesize.workspace = true err-derive.workspace = true hex.workspace = true +paste.workspace = true tracing.workspace = true futures.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs new file mode 100644 index 00000000..97cde158 --- /dev/null +++ b/src/api/admin/api.rs @@ -0,0 +1,876 @@ +use std::collections::HashMap; +use std::convert::TryFrom; +use std::net::SocketAddr; +use std::sync::Arc; + +use paste::paste; +use serde::{Deserialize, Serialize}; + +use garage_rpc::*; + +use garage_model::garage::Garage; + +use garage_api_common::common_error::CommonErrorDerivative; +use garage_api_common::helpers::is_default; + +use crate::api_server::{AdminRpc, AdminRpcResponse}; +use crate::error::Error; +use crate::macros::*; +use crate::{Admin, RequestHandler}; + +// This generates the following: +// +// - An enum AdminApiRequest that contains a variant for all endpoints +// +// - An enum AdminApiResponse that contains a variant for all non-special endpoints. +// This enum is serialized in api_server.rs, without the enum tag, +// which gives directly the JSON response corresponding to the API call. +// This enum does not implement Deserialize as its meaning can be ambiguous. +// +// - An enum TaggedAdminApiResponse that contains the same variants, but +// serializes as a tagged enum. This allows it to be transmitted through +// Garage RPC and deserialized correctly upon receival. +// Conversion from untagged to tagged can be done using the `.tagged()` method. +// +// - AdminApiRequest::name() that returns the name of the endpoint +// +// - impl EndpointHandler for AdminApiHandler, that uses the impl EndpointHandler +// of each request type below for non-special endpoints +admin_endpoints![ + // Special endpoints of the Admin API + @special Options, + @special CheckDomain, + @special Health, + @special Metrics, + + // Cluster operations + GetClusterStatus, + GetClusterHealth, + ConnectClusterNodes, + GetClusterLayout, + UpdateClusterLayout, + ApplyClusterLayout, + RevertClusterLayout, + + // Access key operations + ListKeys, + GetKeyInfo, + CreateKey, + ImportKey, + UpdateKey, + DeleteKey, + + // Bucket operations + ListBuckets, + GetBucketInfo, + CreateBucket, + UpdateBucket, + DeleteBucket, + CleanupIncompleteUploads, + + // Operations on permissions for keys on buckets + AllowBucketKey, + DenyBucketKey, + + // Operations on bucket aliases + AddBucketAlias, + RemoveBucketAlias, + + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + GetClusterStatistics, + LaunchRepairOperation, + + // Worker operations + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, +]; + +local_admin_endpoints![ + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + LaunchRepairOperation, + // Background workers + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiRequest<RB> { + pub node: String, + pub body: RB, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiResponse<RB> { + pub success: HashMap<String, RB>, + pub error: HashMap<String, String>, +} + +// ********************************************** +// Special endpoints +// +// These endpoints don't have associated *Response structs +// because they directly produce an http::Response +// ********************************************** + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OptionsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CheckDomainRequest { + pub domain: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricsRequest; + +// ********************************************** +// Cluster operations +// ********************************************** + +// ---- GetClusterStatus ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterStatusRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetClusterStatusResponse { + pub node: String, + pub garage_version: String, + pub garage_features: Option<Vec<String>>, + pub rust_version: String, + pub db_engine: String, + pub layout_version: u64, + pub nodes: Vec<NodeResp>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct NodeResp { + pub id: String, + pub role: Option<NodeRoleResp>, + pub addr: Option<SocketAddr>, + pub hostname: Option<String>, + pub is_up: bool, + pub last_seen_secs_ago: Option<u64>, + pub draining: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub data_partition: Option<FreeSpaceResp>, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_partition: Option<FreeSpaceResp>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeRoleResp { + pub id: String, + pub zone: String, + pub capacity: Option<u64>, + pub tags: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FreeSpaceResp { + pub available: u64, + pub total: u64, +} + +// ---- GetClusterHealth ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterHealthRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetClusterHealthResponse { + pub status: String, + pub known_nodes: usize, + pub connected_nodes: usize, + pub storage_nodes: usize, + pub storage_nodes_ok: usize, + pub partitions: usize, + pub partitions_quorum: usize, + pub partitions_all_ok: usize, +} + +// ---- ConnectClusterNodes ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConnectClusterNodesRequest(pub Vec<String>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConnectClusterNodesResponse(pub Vec<ConnectNodeResponse>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectNodeResponse { + pub success: bool, + pub error: Option<String>, +} + +// ---- GetClusterLayout ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterLayoutRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetClusterLayoutResponse { + pub version: u64, + pub roles: Vec<NodeRoleResp>, + pub staged_role_changes: Vec<NodeRoleChange>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NodeRoleChange { + pub id: String, + #[serde(flatten)] + pub action: NodeRoleChangeEnum, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum NodeRoleChangeEnum { + #[serde(rename_all = "camelCase")] + Remove { remove: bool }, + #[serde(rename_all = "camelCase")] + Update { + zone: String, + capacity: Option<u64>, + tags: Vec<String>, + }, +} + +// ---- UpdateClusterLayout ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateClusterLayoutRequest(pub Vec<NodeRoleChange>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse); + +// ---- ApplyClusterLayout ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApplyClusterLayoutRequest { + pub version: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApplyClusterLayoutResponse { + pub message: Vec<String>, + pub layout: GetClusterLayoutResponse, +} + +// ---- RevertClusterLayout ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RevertClusterLayoutRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse); + +// ********************************************** +// Access key operations +// ********************************************** + +// ---- ListKeys ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListKeysRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListKeysResponse(pub Vec<ListKeysResponseItem>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListKeysResponseItem { + pub id: String, + pub name: String, +} + +// ---- GetKeyInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetKeyInfoRequest { + pub id: Option<String>, + pub search: Option<String>, + pub show_secret_key: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetKeyInfoResponse { + pub name: String, + pub access_key_id: String, + #[serde(skip_serializing_if = "is_default")] + pub secret_access_key: Option<String>, + pub permissions: KeyPerm, + pub buckets: Vec<KeyInfoBucketResponse>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct KeyPerm { + #[serde(default)] + pub create_bucket: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct KeyInfoBucketResponse { + pub id: String, + pub global_aliases: Vec<String>, + pub local_aliases: Vec<String>, + pub permissions: ApiBucketKeyPerm, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct ApiBucketKeyPerm { + #[serde(default)] + pub read: bool, + #[serde(default)] + pub write: bool, + #[serde(default)] + pub owner: bool, +} + +// ---- CreateKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateKeyRequest { + pub name: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateKeyResponse(pub GetKeyInfoResponse); + +// ---- ImportKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ImportKeyRequest { + pub access_key_id: String, + pub secret_access_key: String, + pub name: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ImportKeyResponse(pub GetKeyInfoResponse); + +// ---- UpdateKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateKeyRequest { + pub id: String, + pub body: UpdateKeyRequestBody, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateKeyResponse(pub GetKeyInfoResponse); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateKeyRequestBody { + pub name: Option<String>, + pub allow: Option<KeyPerm>, + pub deny: Option<KeyPerm>, +} + +// ---- DeleteKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteKeyRequest { + pub id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteKeyResponse; + +// ********************************************** +// Bucket operations +// ********************************************** + +// ---- ListBuckets ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListBucketsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListBucketsResponse(pub Vec<ListBucketsResponseItem>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListBucketsResponseItem { + pub id: String, + pub global_aliases: Vec<String>, + pub local_aliases: Vec<BucketLocalAlias>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BucketLocalAlias { + pub access_key_id: String, + pub alias: String, +} + +// ---- GetBucketInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetBucketInfoRequest { + pub id: Option<String>, + pub global_alias: Option<String>, + pub search: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetBucketInfoResponse { + pub id: String, + pub global_aliases: Vec<String>, + pub website_access: bool, + #[serde(default)] + pub website_config: Option<GetBucketInfoWebsiteResponse>, + pub keys: Vec<GetBucketInfoKey>, + pub objects: i64, + pub bytes: i64, + pub unfinished_uploads: i64, + pub unfinished_multipart_uploads: i64, + pub unfinished_multipart_upload_parts: i64, + pub unfinished_multipart_upload_bytes: i64, + pub quotas: ApiBucketQuotas, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetBucketInfoWebsiteResponse { + pub index_document: String, + pub error_document: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetBucketInfoKey { + pub access_key_id: String, + pub name: String, + pub permissions: ApiBucketKeyPerm, + pub bucket_local_aliases: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ApiBucketQuotas { + pub max_size: Option<u64>, + pub max_objects: Option<u64>, +} + +// ---- CreateBucket ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateBucketRequest { + pub global_alias: Option<String>, + pub local_alias: Option<CreateBucketLocalAlias>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateBucketResponse(pub GetBucketInfoResponse); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateBucketLocalAlias { + pub access_key_id: String, + pub alias: String, + #[serde(default)] + pub allow: ApiBucketKeyPerm, +} + +// ---- UpdateBucket ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateBucketRequest { + pub id: String, + pub body: UpdateBucketRequestBody, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct UpdateBucketResponse(pub GetBucketInfoResponse); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateBucketRequestBody { + pub website_access: Option<UpdateBucketWebsiteAccess>, + pub quotas: Option<ApiBucketQuotas>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UpdateBucketWebsiteAccess { + pub enabled: bool, + pub index_document: Option<String>, + pub error_document: Option<String>, +} + +// ---- DeleteBucket ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteBucketRequest { + pub id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeleteBucketResponse; + +// ---- CleanupIncompleteUploads ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsRequest { + pub bucket_id: String, + pub older_than_secs: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsResponse { + pub uploads_deleted: u64, +} + +// ********************************************** +// Operations on permissions for keys on buckets +// ********************************************** + +// ---- AllowBucketKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AllowBucketKeyRequest(pub BucketKeyPermChangeRequest); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AllowBucketKeyResponse(pub GetBucketInfoResponse); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BucketKeyPermChangeRequest { + pub bucket_id: String, + pub access_key_id: String, + pub permissions: ApiBucketKeyPerm, +} + +// ---- DenyBucketKey ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DenyBucketKeyRequest(pub BucketKeyPermChangeRequest); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DenyBucketKeyResponse(pub GetBucketInfoResponse); + +// ********************************************** +// Operations on bucket aliases +// ********************************************** + +// ---- AddBucketAlias ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AddBucketAliasRequest { + pub bucket_id: String, + #[serde(flatten)] + pub alias: BucketAliasEnum, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AddBucketAliasResponse(pub GetBucketInfoResponse); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum BucketAliasEnum { + #[serde(rename_all = "camelCase")] + Global { global_alias: String }, + #[serde(rename_all = "camelCase")] + Local { + local_alias: String, + access_key_id: String, + }, +} + +// ---- RemoveBucketAlias ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RemoveBucketAliasRequest { + pub bucket_id: String, + #[serde(flatten)] + pub alias: BucketAliasEnum, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); + +// ********************************************** +// Node operations +// ********************************************** + +// ---- CreateMetadataSnapshot ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalCreateMetadataSnapshotRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalCreateMetadataSnapshotResponse; + +// ---- GetNodeStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalGetNodeStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetNodeStatisticsResponse { + pub freeform: String, +} + +// ---- GetClusterStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GetClusterStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterStatisticsResponse { + pub freeform: String, +} + +// ---- LaunchRepairOperation ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationRequest { + pub repair_type: RepairType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RepairType { + Tables, + Blocks, + Versions, + MultipartUploads, + BlockRefs, + BlockRc, + Rebalance, + Scrub(ScrubCommand), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrubCommand { + Start, + Pause, + Resume, + Cancel, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationResponse; + +// ********************************************** +// Worker operations +// ********************************************** + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct LocalListWorkersRequest { + #[serde(default)] + pub busy_only: bool, + #[serde(default)] + pub error_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerInfoResp { + pub id: u64, + pub name: String, + pub state: WorkerStateResp, + pub errors: u64, + pub consecutive_errors: u64, + pub last_error: Option<WorkerLastError>, + pub tranquility: Option<u32>, + pub progress: Option<String>, + pub queue_length: Option<u64>, + pub persistent_errors: Option<u64>, + pub freeform: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WorkerStateResp { + Busy, + Throttled { duration_secs: f32 }, + Idle, + Done, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerLastError { + pub message: String, + pub secs_ago: u64, +} + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoRequest { + pub id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp); + +// ---- GetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableRequest { + pub variable: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>); + +// ---- SetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableRequest { + pub variable: String, + pub value: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableResponse { + pub variable: String, + pub value: String, +} + +// ********************************************** +// Block operations +// ********************************************** + +// ---- ListBlockErrors ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListBlockErrorsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>); + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlockError { + pub block_hash: String, + pub refcount: u64, + pub error_count: u64, + pub last_try_secs_ago: u64, + pub next_try_in_secs: u64, +} + +// ---- GetBlockInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoRequest { + pub block_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoResponse { + pub block_hash: String, + pub refcount: u64, + pub versions: Vec<BlockVersion>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockVersion { + pub version_id: String, + pub deleted: bool, + pub garbage_collected: bool, + pub backlink: Option<BlockVersionBacklink>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockVersionBacklink { + Object { + bucket_id: String, + key: String, + }, + Upload { + upload_id: String, + upload_deleted: bool, + upload_garbage_collected: bool, + bucket_id: Option<String>, + key: Option<String>, + }, +} + +// ---- RetryBlockResync ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum LocalRetryBlockResyncRequest { + #[serde(rename_all = "camelCase")] + All { all: bool }, + #[serde(rename_all = "camelCase")] + Blocks { block_hashes: Vec<String> }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalRetryBlockResyncResponse { + pub count: u64, +} + +// ---- PurgeBlocks ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksRequest(pub Vec<String>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksResponse { + pub blocks_purged: u64, + pub objects_deleted: u64, + pub uploads_deleted: u64, + pub versions_deleted: u64, +} diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 6f0c474f..37574dcf 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,333 +1,234 @@ -use std::collections::HashMap; +use std::borrow::Cow; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; -use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; use opentelemetry::trace::SpanRef; #[cfg(feature = "metrics")] use opentelemetry_prometheus::PrometheusExporter; -#[cfg(feature = "metrics")] -use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; -use garage_rpc::system::ClusterHealthStatus; +use garage_rpc::{Endpoint as RpcEndpoint, *}; +use garage_util::background::BackgroundRunner; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_api_common::generic_server::*; use garage_api_common::helpers::*; -use crate::bucket::*; -use crate::cluster::*; +use crate::api::*; use crate::error::*; -use crate::key::*; use crate::router_v0; -use crate::router_v1::{Authorization, Endpoint}; +use crate::router_v1; +use crate::Authorization; +use crate::RequestHandler; + +// ---- FOR RPC ---- + +pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpc { + Proxy(AdminApiRequest), + Internal(LocalAdminApiRequest), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpcResponse { + ProxyApiOkResponse(TaggedAdminApiResponse), + InternalApiOkResponse(LocalAdminApiResponse), + ApiErrorResponse { + http_code: u16, + error_code: String, + message: String, + }, +} + +impl Rpc for AdminRpc { + type Response = Result<AdminRpcResponse, GarageError>; +} + +impl EndpointHandler<AdminRpc> for AdminApiServer { + async fn handle( + self: &Arc<Self>, + message: &AdminRpc, + _from: NodeID, + ) -> Result<AdminRpcResponse, GarageError> { + match message { + AdminRpc::Proxy(req) => { + info!("Proxied admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + AdminRpc::Internal(req) => { + info!("Internal admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + } + } +} + +// ---- FOR HTTP ---- pub type ResBody = BoxBody<Error>; pub struct AdminApiServer { garage: Arc<Garage>, #[cfg(feature = "metrics")] - exporter: PrometheusExporter, + pub(crate) exporter: PrometheusExporter, metrics_token: Option<String>, admin_token: Option<String>, + pub(crate) background: Arc<BackgroundRunner>, + pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>, +} + +pub enum HttpEndpoint { + Old(router_v1::Endpoint), + New(String), } impl AdminApiServer { pub fn new( garage: Arc<Garage>, + background: Arc<BackgroundRunner>, #[cfg(feature = "metrics")] exporter: PrometheusExporter, - ) -> Self { + ) -> Arc<Self> { let cfg = &garage.config.admin; let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token); let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token); - Self { + + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, #[cfg(feature = "metrics")] exporter, metrics_token, admin_token, - } + background, + endpoint, + }); + admin.endpoint.set_handler(admin.clone()); + admin } pub async fn run( - self, + self: Arc<Self>, bind_addr: UnixOrTCPSocketAddress, must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); - ApiServer::new(region, self) + ApiServer::new(region, ArcAdminApiServer(self)) .run_server(bind_addr, Some(0o220), must_exit) .await } - fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> { - Ok(Response::builder() - .status(StatusCode::NO_CONTENT) - .header(ALLOW, "OPTIONS, GET, POST") - .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") - .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .body(empty_body())?) - } - - async fn handle_check_domain( + async fn handle_http_api( &self, req: Request<IncomingBody>, + endpoint: HttpEndpoint, ) -> Result<Response<ResBody>, Error> { - let query_params: HashMap<String, String> = req - .uri() - .query() - .map(|v| { - url::form_urlencoded::parse(v.as_bytes()) - .into_owned() - .collect() - }) - .unwrap_or_else(HashMap::new); - - let has_domain_key = query_params.contains_key("domain"); - - if !has_domain_key { - return Err(Error::bad_request("No domain query string found")); - } - - let domain = query_params - .get("domain") - .ok_or_internal_error("Could not parse domain query string")?; - - if self.check_domain(domain).await? { - Ok(Response::builder() - .status(StatusCode::OK) - .body(string_body(format!( - "Domain '{domain}' is managed by Garage" - )))?) - } else { - Err(Error::bad_request(format!( - "Domain '{domain}' is not managed by Garage" - ))) - } - } - - async fn check_domain(&self, domain: &str) -> Result<bool, Error> { - // Resolve bucket from domain name, inferring if the website must be activated for the - // domain to be valid. - let (bucket_name, must_check_website) = if let Some(bname) = self - .garage - .config - .s3_api - .root_domain - .as_ref() - .and_then(|rd| host_to_bucket(domain, rd)) - { - (bname.to_string(), false) - } else if let Some(bname) = self - .garage - .config - .s3_web - .as_ref() - .and_then(|sw| host_to_bucket(domain, sw.root_domain.as_str())) - { - (bname.to_string(), true) - } else { - (domain.to_string(), true) - }; + let auth_header = req.headers().get(AUTHORIZATION).cloned(); - let bucket_id = match self - .garage - .bucket_helper() - .resolve_global_bucket_name(&bucket_name) - .await? - { - Some(bucket_id) => bucket_id, - None => return Ok(false), + let request = match endpoint { + HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, + HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?, }; - if !must_check_website { - return Ok(true); - } - - let bucket = self - .garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let bucket_state = bucket.state.as_option().unwrap(); - let bucket_website_config = bucket_state.website_config.get(); + let required_auth_hash = + match request.authorization_type() { + Authorization::None => None, + Authorization::MetricsToken => self.metrics_token.as_deref(), + Authorization::AdminToken => match self.admin_token.as_deref() { + None => return Err(Error::forbidden( + "Admin token isn't configured, admin API access is disabled for security.", + )), + Some(t) => Some(t), + }, + }; - match bucket_website_config { - Some(_v) => Ok(true), - None => Ok(false), + if let Some(password_hash) = required_auth_hash { + match auth_header { + None => return Err(Error::forbidden("Authorization token must be provided")), + Some(authorization) => { + verify_bearer_token(&authorization, password_hash)?; + } + } } - } - - fn handle_health(&self) -> Result<Response<ResBody>, Error> { - let health = self.garage.system.health(); - - let (status, status_str) = match health.status { - ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), - ClusterHealthStatus::Degraded => ( - StatusCode::OK, - "Garage is operational but some storage nodes are unavailable", - ), - ClusterHealthStatus::Unavailable => ( - StatusCode::SERVICE_UNAVAILABLE, - "Quorum is not available for some/all partitions, reads and writes will fail", - ), - }; - let status_str = format!( - "{}\nConsult the full health check API endpoint at /v1/health for more details\n", - status_str - ); - - Ok(Response::builder() - .status(status) - .header(http::header::CONTENT_TYPE, "text/plain") - .body(string_body(status_str))?) - } - fn handle_metrics(&self) -> Result<Response<ResBody>, Error> { - #[cfg(feature = "metrics")] - { - use opentelemetry::trace::Tracer; - - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - self.exporter.registry().gather() - }); - - encoder - .encode(&metric_families, &mut buffer) - .ok_or_internal_error("Could not serialize metrics")?; - - Ok(Response::builder() - .status(StatusCode::OK) - .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(bytes_body(buffer.into()))?) + match request { + AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await, + req => { + let res = req.handle(&self.garage, &self).await?; + let mut res = json_ok_response(&res)?; + res.headers_mut() + .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); + Ok(res) + } } - #[cfg(not(feature = "metrics"))] - Err(Error::bad_request( - "Garage was built without the metrics feature".to_string(), - )) } } -impl ApiHandler for AdminApiServer { +struct ArcAdminApiServer(Arc<AdminApiServer>); + +impl ApiHandler for ArcAdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; - type Endpoint = Endpoint; + type Endpoint = HttpEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> { if req.uri().path().starts_with("/v0/") { let endpoint_v0 = router_v0::Endpoint::from_request(req)?; - Endpoint::from_v0(endpoint_v0) + let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; + Ok(HttpEndpoint::Old(endpoint_v1)) + } else if req.uri().path().starts_with("/v1/") { + let endpoint_v1 = router_v1::Endpoint::from_request(req)?; + Ok(HttpEndpoint::Old(endpoint_v1)) } else { - Endpoint::from_request(req) + Ok(HttpEndpoint::New(req.uri().path().to_string())) } } async fn handle( &self, req: Request<IncomingBody>, - endpoint: Endpoint, + endpoint: HttpEndpoint, ) -> Result<Response<ResBody>, Error> { - let required_auth_hash = - match endpoint.authorization_type() { - Authorization::None => None, - Authorization::MetricsToken => self.metrics_token.as_deref(), - Authorization::AdminToken => match self.admin_token.as_deref() { - None => return Err(Error::forbidden( - "Admin token isn't configured, admin API access is disabled for security.", - )), - Some(t) => Some(t), - }, - }; - - if let Some(password_hash) = required_auth_hash { - match req.headers().get("Authorization") { - None => return Err(Error::forbidden("Authorization token must be provided")), - Some(authorization) => { - verify_bearer_token(&authorization, password_hash)?; - } - } - } - - match endpoint { - Endpoint::Options => self.handle_options(&req), - Endpoint::CheckDomain => self.handle_check_domain(req).await, - Endpoint::Health => self.handle_health(), - Endpoint::Metrics => self.handle_metrics(), - Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, - Endpoint::GetClusterHealth => handle_get_cluster_health(&self.garage).await, - Endpoint::ConnectClusterNodes => handle_connect_cluster_nodes(&self.garage, req).await, - // Layout - Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, - Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, - Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, - Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await, - // Keys - Endpoint::ListKeys => handle_list_keys(&self.garage).await, - Endpoint::GetKeyInfo { - id, - search, - show_secret_key, - } => { - let show_secret_key = show_secret_key.map(|x| x == "true").unwrap_or(false); - handle_get_key_info(&self.garage, id, search, show_secret_key).await - } - Endpoint::CreateKey => handle_create_key(&self.garage, req).await, - Endpoint::ImportKey => handle_import_key(&self.garage, req).await, - Endpoint::UpdateKey { id } => handle_update_key(&self.garage, id, req).await, - Endpoint::DeleteKey { id } => handle_delete_key(&self.garage, id).await, - // Buckets - Endpoint::ListBuckets => handle_list_buckets(&self.garage).await, - Endpoint::GetBucketInfo { id, global_alias } => { - handle_get_bucket_info(&self.garage, id, global_alias).await - } - Endpoint::CreateBucket => handle_create_bucket(&self.garage, req).await, - Endpoint::DeleteBucket { id } => handle_delete_bucket(&self.garage, id).await, - Endpoint::UpdateBucket { id } => handle_update_bucket(&self.garage, id, req).await, - // Bucket-key permissions - Endpoint::BucketAllowKey => { - handle_bucket_change_key_perm(&self.garage, req, true).await - } - Endpoint::BucketDenyKey => { - handle_bucket_change_key_perm(&self.garage, req, false).await - } - // Bucket aliasing - Endpoint::GlobalAliasBucket { id, alias } => { - handle_global_alias_bucket(&self.garage, id, alias).await - } - Endpoint::GlobalUnaliasBucket { id, alias } => { - handle_global_unalias_bucket(&self.garage, id, alias).await - } - Endpoint::LocalAliasBucket { - id, - access_key_id, - alias, - } => handle_local_alias_bucket(&self.garage, id, access_key_id, alias).await, - Endpoint::LocalUnaliasBucket { - id, - access_key_id, - alias, - } => handle_local_unalias_bucket(&self.garage, id, access_key_id, alias).await, - } + self.0.handle_http_api(req, endpoint).await } } -impl ApiEndpoint for Endpoint { - fn name(&self) -> &'static str { - Endpoint::name(self) +impl ApiEndpoint for HttpEndpoint { + fn name(&self) -> Cow<'static, str> { + match self { + Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()), + Self::New(path) => Cow::Owned(path.clone()), + } } fn add_span_attributes(&self, _span: SpanRef<'_>) {} diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs new file mode 100644 index 00000000..73d186a6 --- /dev/null +++ b/src/api/admin/block.rs @@ -0,0 +1,274 @@ +use std::sync::Arc; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::now_msec; + +use garage_table::EmptyKey; + +use garage_model::garage::Garage; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use garage_api_common::common_error::CommonErrorDerivative; + +use crate::api::*; +use crate::error::*; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListBlockErrorsRequest { + type Response = LocalListBlockErrorsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalListBlockErrorsResponse, Error> { + let errors = garage.block_manager.list_resync_errors()?; + let now = now_msec(); + let errors = errors + .into_iter() + .map(|e| BlockError { + block_hash: hex::encode(&e.hash), + refcount: e.refcount, + error_count: e.error_count, + last_try_secs_ago: now.saturating_sub(e.last_try) / 1000, + next_try_in_secs: e.next_try.saturating_sub(now) / 1000, + }) + .collect(); + Ok(LocalListBlockErrorsResponse(errors)) + } +} + +impl RequestHandler for LocalGetBlockInfoRequest { + type Response = LocalGetBlockInfoResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetBlockInfoResponse, Error> { + let hash = find_block_hash_by_prefix(garage, &self.block_hash)?; + let refcount = garage.block_manager.get_block_rc(&hash)?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? { + let bl = match &v.backlink { + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: u.deleted.get(), + upload_garbage_collected: false, + bucket_id: Some(hex::encode(&u.bucket_id)), + key: Some(u.key.to_string()), + } + } else { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: true, + upload_garbage_collected: true, + bucket_id: None, + key: None, + } + } + } + VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object { + bucket_id: hex::encode(&bucket_id), + key: key.to_string(), + }, + }; + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: v.deleted.get(), + garbage_collected: false, + backlink: Some(bl), + }); + } else { + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: true, + garbage_collected: true, + backlink: None, + }); + } + } + Ok(LocalGetBlockInfoResponse { + block_hash: hex::encode(&hash), + refcount, + versions, + }) + } +} + +impl RequestHandler for LocalRetryBlockResyncRequest { + type Response = LocalRetryBlockResyncResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalRetryBlockResyncResponse, Error> { + match self { + Self::All { all: true } => { + let blocks = garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: blocks.len() as u64, + }) + } + Self::All { all: false } => Err(Error::bad_request("nonsense")), + Self::Blocks { block_hashes } => { + for hash in block_hashes.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: block_hashes.len() as u64, + }) + } + } + } +} + +impl RequestHandler for LocalPurgeBlocksRequest { + type Response = LocalPurgeBlocksResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalPurgeBlocksResponse, Error> { + let mut obj_dels = 0; + let mut mpu_dels = 0; + let mut ver_dels = 0; + + for hash in self.0.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? { + handle_block_purge_version_backlink( + garage, + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; + + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } + + Ok(LocalPurgeBlocksResponse { + blocks_purged: self.0.len() as u64, + versions_deleted: ver_dels, + objects_deleted: obj_dels, + uploads_deleted: mpu_dels, + }) + } +} + +fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> { + if prefix.len() < 4 { + return Err(Error::bad_request( + "Please specify at least 4 characters of the block hash", + )); + } + + let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::bad_request(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string())) +} + +async fn handle_block_purge_version_backlink( + garage: &Arc<Garage>, + version: &Version, + obj_dels: &mut u64, + mpu_dels: &mut u64, +) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) +} diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 2537bfc9..d2bb62e0 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; - -use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; -use serde::{Deserialize, Serialize}; +use std::time::Duration; use garage_util::crdt::*; use garage_util::data::*; @@ -18,102 +16,97 @@ use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; use garage_api_common::common_error::CommonError; -use garage_api_common::helpers::*; -use crate::api_server::ResBody; +use crate::api::*; use crate::error::*; -use crate::key::ApiBucketKeyPerm; - -pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let buckets = garage - .bucket_table - .get_range( - &EmptyKey, - None, - Some(DeletedFilter::NotDeleted), - 10000, - EnumerationOrder::Forward, - ) - .await?; - - let res = buckets - .into_iter() - .map(|b| { - let state = b.state.as_option().unwrap(); - ListBucketResultItem { - id: hex::encode(b.id), - global_aliases: state - .aliases - .items() - .iter() - .filter(|(_, _, a)| *a) - .map(|(n, _, _)| n.to_string()) - .collect::<Vec<_>>(), - local_aliases: state - .local_aliases - .items() - .iter() - .filter(|(_, _, a)| *a) - .map(|((k, n), _, _)| BucketLocalAlias { - access_key_id: k.to_string(), - alias: n.to_string(), - }) - .collect::<Vec<_>>(), - } - }) - .collect::<Vec<_>>(); - - Ok(json_ok_response(&res)?) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct ListBucketResultItem { - id: String, - global_aliases: Vec<String>, - local_aliases: Vec<BucketLocalAlias>, -} +use crate::{Admin, RequestHandler}; + +impl RequestHandler for ListBucketsRequest { + type Response = ListBucketsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ListBucketsResponse, Error> { + let buckets = garage + .bucket_table + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) + .await?; -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct BucketLocalAlias { - access_key_id: String, - alias: String, -} + let res = buckets + .into_iter() + .map(|b| { + let state = b.state.as_option().unwrap(); + ListBucketsResponseItem { + id: hex::encode(b.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + local_aliases: state + .local_aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|((k, n), _, _)| BucketLocalAlias { + access_key_id: k.to_string(), + alias: n.to_string(), + }) + .collect::<Vec<_>>(), + } + }) + .collect::<Vec<_>>(); -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct ApiBucketQuotas { - max_size: Option<u64>, - max_objects: Option<u64>, + Ok(ListBucketsResponse(res)) + } } -pub async fn handle_get_bucket_info( - garage: &Arc<Garage>, - id: Option<String>, - global_alias: Option<String>, -) -> Result<Response<ResBody>, Error> { - let bucket_id = match (id, global_alias) { - (Some(id), None) => parse_bucket_id(&id)?, - (None, Some(ga)) => garage - .bucket_helper() - .resolve_global_bucket_name(&ga) - .await? - .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?, - _ => { - return Err(Error::bad_request( - "Either id or globalAlias must be provided (but not both)", - )); - } - }; +impl RequestHandler for GetBucketInfoRequest { + type Response = GetBucketInfoResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetBucketInfoResponse, Error> { + let bucket_id = match (self.id, self.global_alias, self.search) { + (Some(id), None, None) => parse_bucket_id(&id)?, + (None, Some(ga), None) => garage + .bucket_helper() + .resolve_global_bucket_name(&ga) + .await? + .ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?, + (None, None, Some(search)) => { + garage + .bucket_helper() + .admin_get_existing_matching_bucket(&search) + .await? + } + _ => { + return Err(Error::bad_request( + "Either id, globalAlias or search must be provided (but not several of them)", + )); + } + }; - bucket_info_results(garage, bucket_id).await + bucket_info_results(garage, bucket_id).await + } } async fn bucket_info_results( garage: &Arc<Garage>, bucket_id: Uuid, -) -> Result<Response<ResBody>, Error> { +) -> Result<GetBucketInfoResponse, Error> { let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) @@ -176,301 +169,295 @@ async fn bucket_info_results( let state = bucket.state.as_option().unwrap(); let quotas = state.quotas.get(); - let res = - GetBucketInfoResult { - id: hex::encode(bucket.id), - global_aliases: state - .aliases - .items() - .iter() - .filter(|(_, _, a)| *a) - .map(|(n, _, _)| n.to_string()) - .collect::<Vec<_>>(), - website_access: state.website_config.get().is_some(), - website_config: state.website_config.get().clone().map(|wsc| { - GetBucketInfoWebsiteResult { - index_document: wsc.index_document, - error_document: wsc.error_document, + let res = GetBucketInfoResponse { + id: hex::encode(bucket.id), + global_aliases: state + .aliases + .items() + .iter() + .filter(|(_, _, a)| *a) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), + website_access: state.website_config.get().is_some(), + website_config: state.website_config.get().clone().map(|wsc| { + GetBucketInfoWebsiteResponse { + index_document: wsc.index_document, + error_document: wsc.error_document, + } + }), + keys: relevant_keys + .into_values() + .map(|key| { + let p = key.state.as_option().unwrap(); + GetBucketInfoKey { + access_key_id: key.key_id, + name: p.name.get().to_string(), + permissions: p + .authorized_buckets + .get(&bucket.id) + .map(|p| ApiBucketKeyPerm { + read: p.allow_read, + write: p.allow_write, + owner: p.allow_owner, + }) + .unwrap_or_default(), + bucket_local_aliases: p + .local_aliases + .items() + .iter() + .filter(|(_, _, b)| *b == Some(bucket.id)) + .map(|(n, _, _)| n.to_string()) + .collect::<Vec<_>>(), } - }), - keys: relevant_keys - .into_values() - .map(|key| { - let p = key.state.as_option().unwrap(); - GetBucketInfoKey { - access_key_id: key.key_id, - name: p.name.get().to_string(), - permissions: p - .authorized_buckets - .get(&bucket.id) - .map(|p| ApiBucketKeyPerm { - read: p.allow_read, - write: p.allow_write, - owner: p.allow_owner, - }) - .unwrap_or_default(), - bucket_local_aliases: p - .local_aliases - .items() - .iter() - .filter(|(_, _, b)| *b == Some(bucket.id)) - .map(|(n, _, _)| n.to_string()) - .collect::<Vec<_>>(), - } - }) - .collect::<Vec<_>>(), - objects: *counters.get(OBJECTS).unwrap_or(&0), - bytes: *counters.get(BYTES).unwrap_or(&0), - unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0), - unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0), - unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0), - unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0), - quotas: ApiBucketQuotas { - max_size: quotas.max_size, - max_objects: quotas.max_objects, - }, - }; - - Ok(json_ok_response(&res)?) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetBucketInfoResult { - id: String, - global_aliases: Vec<String>, - website_access: bool, - #[serde(default)] - website_config: Option<GetBucketInfoWebsiteResult>, - keys: Vec<GetBucketInfoKey>, - objects: i64, - bytes: i64, - unfinished_uploads: i64, - unfinished_multipart_uploads: i64, - unfinished_multipart_upload_parts: i64, - unfinished_multipart_upload_bytes: i64, - quotas: ApiBucketQuotas, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetBucketInfoWebsiteResult { - index_document: String, - error_document: Option<String>, -} + }) + .collect::<Vec<_>>(), + objects: *counters.get(OBJECTS).unwrap_or(&0), + bytes: *counters.get(BYTES).unwrap_or(&0), + unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0), + unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0), + unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0), + unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0), + quotas: ApiBucketQuotas { + max_size: quotas.max_size, + max_objects: quotas.max_objects, + }, + }; -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetBucketInfoKey { - access_key_id: String, - name: String, - permissions: ApiBucketKeyPerm, - bucket_local_aliases: Vec<String>, + Ok(res) } -pub async fn handle_create_bucket( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?; - - let helper = garage.locked_helper().await; - - if let Some(ga) = &req.global_alias { - if !is_valid_bucket_name(ga) { - return Err(Error::bad_request(format!( - "{}: {}", - ga, INVALID_BUCKET_NAME_MESSAGE - ))); - } +impl RequestHandler for CreateBucketRequest { + type Response = CreateBucketResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CreateBucketResponse, Error> { + let helper = garage.locked_helper().await; + + if let Some(ga) = &self.global_alias { + if !is_valid_bucket_name(ga) { + return Err(Error::bad_request(format!( + "{}: {}", + ga, INVALID_BUCKET_NAME_MESSAGE + ))); + } - if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { - if alias.state.get().is_some() { - return Err(CommonError::BucketAlreadyExists.into()); + if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? { + if alias.state.get().is_some() { + return Err(CommonError::BucketAlreadyExists.into()); + } } } - } - if let Some(la) = &req.local_alias { - if !is_valid_bucket_name(&la.alias) { - return Err(Error::bad_request(format!( - "{}: {}", - la.alias, INVALID_BUCKET_NAME_MESSAGE - ))); - } + if let Some(la) = &self.local_alias { + if !is_valid_bucket_name(&la.alias) { + return Err(Error::bad_request(format!( + "{}: {}", + la.alias, INVALID_BUCKET_NAME_MESSAGE + ))); + } - let key = helper.key().get_existing_key(&la.access_key_id).await?; - let state = key.state.as_option().unwrap(); - if matches!(state.local_aliases.get(&la.alias), Some(_)) { - return Err(Error::bad_request("Local alias already exists")); + let key = helper.key().get_existing_key(&la.access_key_id).await?; + let state = key.state.as_option().unwrap(); + if matches!(state.local_aliases.get(&la.alias), Some(_)) { + return Err(Error::bad_request("Local alias already exists")); + } } - } - let bucket = Bucket::new(); - garage.bucket_table.insert(&bucket).await?; - - if let Some(ga) = &req.global_alias { - helper.set_global_bucket_alias(bucket.id, ga).await?; - } + let bucket = Bucket::new(); + garage.bucket_table.insert(&bucket).await?; - if let Some(la) = &req.local_alias { - helper - .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias) - .await?; + if let Some(ga) = &self.global_alias { + helper.set_global_bucket_alias(bucket.id, ga).await?; + } - if la.allow.read || la.allow.write || la.allow.owner { + if let Some(la) = &self.local_alias { helper - .set_bucket_key_permissions( - bucket.id, - &la.access_key_id, - BucketKeyPerm { - timestamp: now_msec(), - allow_read: la.allow.read, - allow_write: la.allow.write, - allow_owner: la.allow.owner, - }, - ) + .set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias) .await?; - } - } - bucket_info_results(garage, bucket.id).await -} + if la.allow.read || la.allow.write || la.allow.owner { + helper + .set_bucket_key_permissions( + bucket.id, + &la.access_key_id, + BucketKeyPerm { + timestamp: now_msec(), + allow_read: la.allow.read, + allow_write: la.allow.write, + allow_owner: la.allow.owner, + }, + ) + .await?; + } + } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateBucketRequest { - global_alias: Option<String>, - local_alias: Option<CreateBucketLocalAlias>, + Ok(CreateBucketResponse( + bucket_info_results(garage, bucket.id).await?, + )) + } } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateBucketLocalAlias { - access_key_id: String, - alias: String, - #[serde(default)] - allow: ApiBucketKeyPerm, -} +impl RequestHandler for DeleteBucketRequest { + type Response = DeleteBucketResponse; -pub async fn handle_delete_bucket( - garage: &Arc<Garage>, - id: String, -) -> Result<Response<ResBody>, Error> { - let helper = garage.locked_helper().await; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DeleteBucketResponse, Error> { + let helper = garage.locked_helper().await; - let bucket_id = parse_bucket_id(&id)?; + let bucket_id = parse_bucket_id(&self.id)?; - let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; - let state = bucket.state.as_option().unwrap(); + let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; + let state = bucket.state.as_option().unwrap(); - // Check bucket is empty - if !helper.bucket().is_bucket_empty(bucket_id).await? { - return Err(CommonError::BucketNotEmpty.into()); - } + // Check bucket is empty + if !helper.bucket().is_bucket_empty(bucket_id).await? { + return Err(CommonError::BucketNotEmpty.into()); + } - // --- done checking, now commit --- - // 1. delete authorization from keys that had access - for (key_id, perm) in bucket.authorized_keys() { - if perm.is_any() { - helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) - .await?; + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, perm) in bucket.authorized_keys() { + if perm.is_any() { + helper + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } } - } - // 2. delete all local aliases - for ((key_id, alias), _, active) in state.local_aliases.items().iter() { - if *active { - helper - .unset_local_bucket_alias(bucket.id, key_id, alias) - .await?; + // 2. delete all local aliases + for ((key_id, alias), _, active) in state.local_aliases.items().iter() { + if *active { + helper + .unset_local_bucket_alias(bucket.id, key_id, alias) + .await?; + } } - } - // 3. delete all global aliases - for (alias, _, active) in state.aliases.items().iter() { - if *active { - helper.purge_global_bucket_alias(bucket.id, alias).await?; + // 3. delete all global aliases + for (alias, _, active) in state.aliases.items().iter() { + if *active { + helper.purge_global_bucket_alias(bucket.id, alias).await?; + } } - } - // 4. delete bucket - bucket.state = Deletable::delete(); - garage.bucket_table.insert(&bucket).await?; + // 4. delete bucket + bucket.state = Deletable::delete(); + garage.bucket_table.insert(&bucket).await?; - Ok(Response::builder() - .status(StatusCode::NO_CONTENT) - .body(empty_body())?) + Ok(DeleteBucketResponse) + } } -pub async fn handle_update_bucket( - garage: &Arc<Garage>, - id: String, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?; - let bucket_id = parse_bucket_id(&id)?; +impl RequestHandler for UpdateBucketRequest { + type Response = UpdateBucketResponse; - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateBucketResponse, Error> { + let bucket_id = parse_bucket_id(&self.id)?; - let state = bucket.state.as_option_mut().unwrap(); - - if let Some(wa) = req.website_access { - if wa.enabled { - state.website_config.update(Some(WebsiteConfig { - index_document: wa.index_document.ok_or_bad_request( - "Please specify indexDocument when enabling website access.", - )?, - error_document: wa.error_document, - })); - } else { - if wa.index_document.is_some() || wa.error_document.is_some() { - return Err(Error::bad_request( - "Cannot specify indexDocument or errorDocument when disabling website access.", - )); + let mut bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let state = bucket.state.as_option_mut().unwrap(); + + if let Some(wa) = self.body.website_access { + if wa.enabled { + state.website_config.update(Some(WebsiteConfig { + index_document: wa.index_document.ok_or_bad_request( + "Please specify indexDocument when enabling website access.", + )?, + error_document: wa.error_document, + })); + } else { + if wa.index_document.is_some() || wa.error_document.is_some() { + return Err(Error::bad_request( + "Cannot specify indexDocument or errorDocument when disabling website access.", + )); + } + state.website_config.update(None); } - state.website_config.update(None); } - } - if let Some(q) = req.quotas { - state.quotas.update(BucketQuotas { - max_size: q.max_size, - max_objects: q.max_objects, - }); - } + if let Some(q) = self.body.quotas { + state.quotas.update(BucketQuotas { + max_size: q.max_size, + max_objects: q.max_objects, + }); + } - garage.bucket_table.insert(&bucket).await?; + garage.bucket_table.insert(&bucket).await?; - bucket_info_results(garage, bucket_id).await + Ok(UpdateBucketResponse( + bucket_info_results(garage, bucket_id).await?, + )) + } } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct UpdateBucketRequest { - website_access: Option<UpdateBucketWebsiteAccess>, - quotas: Option<ApiBucketQuotas>, -} +impl RequestHandler for CleanupIncompleteUploadsRequest { + type Response = CleanupIncompleteUploadsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CleanupIncompleteUploadsResponse, Error> { + let duration = Duration::from_secs(self.older_than_secs); + + let bucket_id = parse_bucket_id(&self.bucket_id)?; + + let count = garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket_id, duration) + .await?; -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct UpdateBucketWebsiteAccess { - enabled: bool, - index_document: Option<String>, - error_document: Option<String>, + Ok(CleanupIncompleteUploadsResponse { + uploads_deleted: count as u64, + }) + } } // ---- BUCKET/KEY PERMISSIONS ---- +impl RequestHandler for AllowBucketKeyRequest { + type Response = AllowBucketKeyResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<AllowBucketKeyResponse, Error> { + let res = handle_bucket_change_key_perm(garage, self.0, true).await?; + Ok(AllowBucketKeyResponse(res)) + } +} + +impl RequestHandler for DenyBucketKeyRequest { + type Response = DenyBucketKeyResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DenyBucketKeyResponse, Error> { + let res = handle_bucket_change_key_perm(garage, self.0, false).await?; + Ok(DenyBucketKeyResponse(res)) + } +} + pub async fn handle_bucket_change_key_perm( garage: &Arc<Garage>, - req: Request<IncomingBody>, + req: BucketKeyPermChangeRequest, new_perm_flag: bool, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?; - +) -> Result<GetBucketInfoResponse, Error> { let helper = garage.locked_helper().await; let bucket_id = parse_bucket_id(&req.bucket_id)?; @@ -503,76 +490,74 @@ pub async fn handle_bucket_change_key_perm( bucket_info_results(garage, bucket.id).await } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct BucketKeyPermChangeRequest { - bucket_id: String, - access_key_id: String, - permissions: ApiBucketKeyPerm, -} - // ---- BUCKET ALIASES ---- -pub async fn handle_global_alias_bucket( - garage: &Arc<Garage>, - bucket_id: String, - alias: String, -) -> Result<Response<ResBody>, Error> { - let bucket_id = parse_bucket_id(&bucket_id)?; - - let helper = garage.locked_helper().await; +impl RequestHandler for AddBucketAliasRequest { + type Response = AddBucketAliasResponse; - helper.set_global_bucket_alias(bucket_id, &alias).await?; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<AddBucketAliasResponse, Error> { + let bucket_id = parse_bucket_id(&self.bucket_id)?; - bucket_info_results(garage, bucket_id).await -} + let helper = garage.locked_helper().await; -pub async fn handle_global_unalias_bucket( - garage: &Arc<Garage>, - bucket_id: String, - alias: String, -) -> Result<Response<ResBody>, Error> { - let bucket_id = parse_bucket_id(&bucket_id)?; - - let helper = garage.locked_helper().await; - - helper.unset_global_bucket_alias(bucket_id, &alias).await?; + match self.alias { + BucketAliasEnum::Global { global_alias } => { + helper + .set_global_bucket_alias(bucket_id, &global_alias) + .await?; + } + BucketAliasEnum::Local { + local_alias, + access_key_id, + } => { + helper + .set_local_bucket_alias(bucket_id, &access_key_id, &local_alias) + .await?; + } + } - bucket_info_results(garage, bucket_id).await + Ok(AddBucketAliasResponse( + bucket_info_results(garage, bucket_id).await?, + )) + } } -pub async fn handle_local_alias_bucket( - garage: &Arc<Garage>, - bucket_id: String, - access_key_id: String, - alias: String, -) -> Result<Response<ResBody>, Error> { - let bucket_id = parse_bucket_id(&bucket_id)?; - - let helper = garage.locked_helper().await; - - helper - .set_local_bucket_alias(bucket_id, &access_key_id, &alias) - .await?; - - bucket_info_results(garage, bucket_id).await -} +impl RequestHandler for RemoveBucketAliasRequest { + type Response = RemoveBucketAliasResponse; -pub async fn handle_local_unalias_bucket( - garage: &Arc<Garage>, - bucket_id: String, - access_key_id: String, - alias: String, -) -> Result<Response<ResBody>, Error> { - let bucket_id = parse_bucket_id(&bucket_id)?; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<RemoveBucketAliasResponse, Error> { + let bucket_id = parse_bucket_id(&self.bucket_id)?; - let helper = garage.locked_helper().await; + let helper = garage.locked_helper().await; - helper - .unset_local_bucket_alias(bucket_id, &access_key_id, &alias) - .await?; + match self.alias { + BucketAliasEnum::Global { global_alias } => { + helper + .unset_global_bucket_alias(bucket_id, &global_alias) + .await?; + } + BucketAliasEnum::Local { + local_alias, + access_key_id, + } => { + helper + .unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias) + .await?; + } + } - bucket_info_results(garage, bucket_id).await + Ok(RemoveBucketAliasResponse( + bucket_info_results(garage, bucket_id).await?, + )) + } } // ---- HELPER ---- diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index ffa0fa71..cb1fa493 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,10 +1,6 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::sync::Arc; -use hyper::{body::Incoming as IncomingBody, Request, Response}; -use serde::{Deserialize, Serialize}; - use garage_util::crdt::*; use garage_util::data::*; @@ -12,158 +8,182 @@ use garage_rpc::layout; use garage_model::garage::Garage; -use garage_api_common::helpers::{json_ok_response, parse_json_body}; - -use crate::api_server::ResBody; +use crate::api::*; use crate::error::*; - -pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let layout = garage.system.cluster_layout(); - let mut nodes = garage - .system - .get_known_nodes() - .into_iter() - .map(|i| { - ( - i.id, - NodeResp { - id: hex::encode(i.id), - addr: i.addr, - hostname: i.status.hostname, - is_up: i.is_up, - last_seen_secs_ago: i.last_seen_secs_ago, - data_partition: i - .status - .data_disk_avail - .map(|(avail, total)| FreeSpaceResp { - available: avail, - total, +use crate::{Admin, RequestHandler}; + +impl RequestHandler for GetClusterStatusRequest { + type Response = GetClusterStatusResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterStatusResponse, Error> { + let layout = garage.system.cluster_layout(); + let mut nodes = garage + .system + .get_known_nodes() + .into_iter() + .map(|i| { + ( + i.id, + NodeResp { + id: hex::encode(i.id), + addr: i.addr, + hostname: i.status.hostname, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + data_partition: i.status.data_disk_avail.map(|(avail, total)| { + FreeSpaceResp { + available: avail, + total, + } }), - metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| { - FreeSpaceResp { - available: avail, - total, - } - }), - ..Default::default() - }, - ) - }) - .collect::<HashMap<_, _>>(); - - for (id, _, role) in layout.current().roles.items().iter() { - if let layout::NodeRoleV(Some(r)) = role { - let role = NodeRoleResp { - id: hex::encode(id), - zone: r.zone.to_string(), - capacity: r.capacity, - tags: r.tags.clone(), - }; - match nodes.get_mut(id) { - None => { - nodes.insert( - *id, - NodeResp { - id: hex::encode(id), - role: Some(role), - ..Default::default() - }, - ); - } - Some(n) => { - n.role = Some(role); - } - } - } - } + metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| { + FreeSpaceResp { + available: avail, + total, + } + }), + ..Default::default() + }, + ) + }) + .collect::<HashMap<_, _>>(); - for ver in layout.versions().iter().rev().skip(1) { - for (id, _, role) in ver.roles.items().iter() { + for (id, _, role) in layout.current().roles.items().iter() { if let layout::NodeRoleV(Some(r)) = role { - if r.capacity.is_some() { - if let Some(n) = nodes.get_mut(id) { - if n.role.is_none() { - n.draining = true; - } - } else { + let role = NodeRoleResp { + id: hex::encode(id), + zone: r.zone.to_string(), + capacity: r.capacity, + tags: r.tags.clone(), + }; + match nodes.get_mut(id) { + None => { nodes.insert( *id, NodeResp { id: hex::encode(id), - draining: true, + role: Some(role), ..Default::default() }, ); } + Some(n) => { + n.role = Some(role); + } } } } - } - - let mut nodes = nodes.into_values().collect::<Vec<_>>(); - nodes.sort_by(|x, y| x.id.cmp(&y.id)); - let res = GetClusterStatusResponse { - node: hex::encode(garage.system.id), - garage_version: garage_util::version::garage_version(), - garage_features: garage_util::version::garage_features(), - rust_version: garage_util::version::rust_version(), - db_engine: garage.db.engine(), - layout_version: layout.current().version, - nodes, - }; + for ver in layout.versions().iter().rev().skip(1) { + for (id, _, role) in ver.roles.items().iter() { + if let layout::NodeRoleV(Some(r)) = role { + if r.capacity.is_some() { + if let Some(n) = nodes.get_mut(id) { + if n.role.is_none() { + n.draining = true; + } + } else { + nodes.insert( + *id, + NodeResp { + id: hex::encode(id), + draining: true, + ..Default::default() + }, + ); + } + } + } + } + } - Ok(json_ok_response(&res)?) + let mut nodes = nodes.into_values().collect::<Vec<_>>(); + nodes.sort_by(|x, y| x.id.cmp(&y.id)); + + Ok(GetClusterStatusResponse { + node: hex::encode(garage.system.id), + garage_version: garage_util::version::garage_version().to_string(), + garage_features: garage_util::version::garage_features() + .map(|features| features.iter().map(ToString::to_string).collect()), + rust_version: garage_util::version::rust_version().to_string(), + db_engine: garage.db.engine(), + layout_version: layout.current().version, + nodes, + }) + } } -pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - use garage_rpc::system::ClusterHealthStatus; - let health = garage.system.health(); - let health = ClusterHealth { - status: match health.status { - ClusterHealthStatus::Healthy => "healthy", - ClusterHealthStatus::Degraded => "degraded", - ClusterHealthStatus::Unavailable => "unavailable", - }, - known_nodes: health.known_nodes, - connected_nodes: health.connected_nodes, - storage_nodes: health.storage_nodes, - storage_nodes_ok: health.storage_nodes_ok, - partitions: health.partitions, - partitions_quorum: health.partitions_quorum, - partitions_all_ok: health.partitions_all_ok, - }; - Ok(json_ok_response(&health)?) +impl RequestHandler for GetClusterHealthRequest { + type Response = GetClusterHealthResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterHealthResponse, Error> { + use garage_rpc::system::ClusterHealthStatus; + let health = garage.system.health(); + let health = GetClusterHealthResponse { + status: match health.status { + ClusterHealthStatus::Healthy => "healthy", + ClusterHealthStatus::Degraded => "degraded", + ClusterHealthStatus::Unavailable => "unavailable", + } + .to_string(), + known_nodes: health.known_nodes, + connected_nodes: health.connected_nodes, + storage_nodes: health.storage_nodes, + storage_nodes_ok: health.storage_nodes_ok, + partitions: health.partitions, + partitions_quorum: health.partitions_quorum, + partitions_all_ok: health.partitions_all_ok, + }; + Ok(health) + } } -pub async fn handle_connect_cluster_nodes( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<Vec<String>, _, Error>(req).await?; - - let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) - .await - .into_iter() - .map(|r| match r { - Ok(()) => ConnectClusterNodesResponse { - success: true, - error: None, - }, - Err(e) => ConnectClusterNodesResponse { - success: false, - error: Some(format!("{}", e)), - }, - }) - .collect::<Vec<_>>(); - - Ok(json_ok_response(&res)?) +impl RequestHandler for ConnectClusterNodesRequest { + type Response = ConnectClusterNodesResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ConnectClusterNodesResponse, Error> { + let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node))) + .await + .into_iter() + .map(|r| match r { + Ok(()) => ConnectNodeResponse { + success: true, + error: None, + }, + Err(e) => ConnectNodeResponse { + success: false, + error: Some(format!("{}", e)), + }, + }) + .collect::<Vec<_>>(); + Ok(ConnectClusterNodesResponse(res)) + } } -pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let res = format_cluster_layout(garage.system.cluster_layout().inner()); - - Ok(json_ok_response(&res)?) +impl RequestHandler for GetClusterLayoutRequest { + type Response = GetClusterLayoutResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterLayoutResponse, Error> { + Ok(format_cluster_layout( + garage.system.cluster_layout().inner(), + )) + } } fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse { @@ -213,199 +233,98 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp // ---- -#[derive(Debug, Clone, Copy, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ClusterHealth { - status: &'static str, - known_nodes: usize, - connected_nodes: usize, - storage_nodes: usize, - storage_nodes_ok: usize, - partitions: usize, - partitions_quorum: usize, - partitions_all_ok: usize, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetClusterStatusResponse { - node: String, - garage_version: &'static str, - garage_features: Option<&'static [&'static str]>, - rust_version: &'static str, - db_engine: String, - layout_version: u64, - nodes: Vec<NodeResp>, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct ApplyClusterLayoutResponse { - message: Vec<String>, - layout: GetClusterLayoutResponse, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct ConnectClusterNodesResponse { - success: bool, - error: Option<String>, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetClusterLayoutResponse { - version: u64, - roles: Vec<NodeRoleResp>, - staged_role_changes: Vec<NodeRoleChange>, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct NodeRoleResp { - id: String, - zone: String, - capacity: Option<u64>, - tags: Vec<String>, -} - -#[derive(Serialize, Default)] -#[serde(rename_all = "camelCase")] -struct FreeSpaceResp { - available: u64, - total: u64, -} - -#[derive(Serialize, Default)] -#[serde(rename_all = "camelCase")] -struct NodeResp { - id: String, - role: Option<NodeRoleResp>, - addr: Option<SocketAddr>, - hostname: Option<String>, - is_up: bool, - last_seen_secs_ago: Option<u64>, - draining: bool, - #[serde(skip_serializing_if = "Option::is_none")] - data_partition: Option<FreeSpaceResp>, - #[serde(skip_serializing_if = "Option::is_none")] - metadata_partition: Option<FreeSpaceResp>, -} - // ---- update functions ---- -pub async fn handle_update_cluster_layout( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; - - let mut layout = garage.system.cluster_layout().inner().clone(); - - let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging.get().roles); - - for change in updates { - let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; - let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; - - let new_role = match change.action { - NodeRoleChangeEnum::Remove { remove: true } => None, - NodeRoleChangeEnum::Update { - zone, - capacity, - tags, - } => Some(layout::NodeRole { - zone, - capacity, - tags, - }), - _ => return Err(Error::bad_request("Invalid layout change")), - }; - - layout - .staging - .get_mut() - .roles - .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); - } - - garage - .system - .layout_manager - .update_cluster_layout(&layout) - .await?; - - let res = format_cluster_layout(&layout); - Ok(json_ok_response(&res)?) -} - -pub async fn handle_apply_cluster_layout( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?; - - let layout = garage.system.cluster_layout().inner().clone(); - let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; - - garage - .system - .layout_manager - .update_cluster_layout(&layout) - .await?; - - let res = ApplyClusterLayoutResponse { - message: msg, - layout: format_cluster_layout(&layout), - }; - Ok(json_ok_response(&res)?) -} - -pub async fn handle_revert_cluster_layout( - garage: &Arc<Garage>, -) -> Result<Response<ResBody>, Error> { - let layout = garage.system.cluster_layout().inner().clone(); - let layout = layout.revert_staged_changes()?; - garage - .system - .layout_manager - .update_cluster_layout(&layout) - .await?; - - let res = format_cluster_layout(&layout); - Ok(json_ok_response(&res)?) -} +impl RequestHandler for UpdateClusterLayoutRequest { + type Response = UpdateClusterLayoutResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateClusterLayoutResponse, Error> { + let mut layout = garage.system.cluster_layout().inner().clone(); + + let mut roles = layout.current().roles.clone(); + roles.merge(&layout.staging.get().roles); + + for change in self.0 { + let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; + let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?; + + let new_role = match change.action { + NodeRoleChangeEnum::Remove { remove: true } => None, + NodeRoleChangeEnum::Update { + zone, + capacity, + tags, + } => Some(layout::NodeRole { + zone, + capacity, + tags, + }), + _ => return Err(Error::bad_request("Invalid layout change")), + }; -// ---- + layout + .staging + .get_mut() + .roles + .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); + } -type UpdateClusterLayoutRequest = Vec<NodeRoleChange>; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ApplyLayoutRequest { - version: u64, + let res = format_cluster_layout(&layout); + Ok(UpdateClusterLayoutResponse(res)) + } } -// ---- - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct NodeRoleChange { - id: String, - #[serde(flatten)] - action: NodeRoleChangeEnum, +impl RequestHandler for ApplyClusterLayoutRequest { + type Response = ApplyClusterLayoutResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ApplyClusterLayoutResponse, Error> { + let layout = garage.system.cluster_layout().inner().clone(); + let (layout, msg) = layout.apply_staged_changes(Some(self.version))?; + + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; + + Ok(ApplyClusterLayoutResponse { + message: msg, + layout: format_cluster_layout(&layout), + }) + } } -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -enum NodeRoleChangeEnum { - #[serde(rename_all = "camelCase")] - Remove { remove: bool }, - #[serde(rename_all = "camelCase")] - Update { - zone: String, - capacity: Option<u64>, - tags: Vec<String>, - }, +impl RequestHandler for RevertClusterLayoutRequest { + type Response = RevertClusterLayoutResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<RevertClusterLayoutResponse, Error> { + let layout = garage.system.cluster_layout().inner().clone(); + let layout = layout.revert_staged_changes()?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; + + let res = format_cluster_layout(&layout); + Ok(RevertClusterLayoutResponse(res)) + } } diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 201f9b40..d7ea7dc9 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,14 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested block does not exist + #[error(display = "Block not found: {}", _0)] + NoSuchBlock(String), + + /// The requested worker does not exist + #[error(display = "Worker not found: {}", _0)] + NoSuchWorker(u64), + /// In Import key, the key already exists #[error( display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", @@ -49,10 +57,12 @@ impl From<HelperError> for Error { } impl Error { - fn code(&self) -> &'static str { + pub fn code(&self) -> &'static str { match self { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + Error::NoSuchWorker(_) => "NoSuchWorker", + Error::NoSuchBlock(_) => "NoSuchBlock", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -63,7 +73,9 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => { + StatusCode::NOT_FOUND + } Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index bebf3063..dc6ae4e9 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,173 +1,168 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; -use serde::{Deserialize, Serialize}; - use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; -use garage_api_common::helpers::*; - -use crate::api_server::ResBody; +use crate::api::*; use crate::error::*; - -pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let res = garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - EnumerationOrder::Forward, - ) - .await? - .iter() - .map(|k| ListKeyResultItem { - id: k.key_id.to_string(), - name: k.params().unwrap().name.get().clone(), - }) - .collect::<Vec<_>>(); - - Ok(json_ok_response(&res)?) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct ListKeyResultItem { - id: String, - name: String, -} - -pub async fn handle_get_key_info( - garage: &Arc<Garage>, - id: Option<String>, - search: Option<String>, - show_secret_key: bool, -) -> Result<Response<ResBody>, Error> { - let key = if let Some(id) = id { - garage.key_helper().get_existing_key(&id).await? - } else if let Some(search) = search { - garage - .key_helper() - .get_existing_matching_key(&search) +use crate::{Admin, RequestHandler}; + +impl RequestHandler for ListKeysRequest { + type Response = ListKeysResponse; + + async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> { + let res = garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + EnumerationOrder::Forward, + ) .await? - } else { - unreachable!(); - }; + .iter() + .map(|k| ListKeysResponseItem { + id: k.key_id.to_string(), + name: k.params().unwrap().name.get().clone(), + }) + .collect::<Vec<_>>(); - key_info_results(garage, key, show_secret_key).await + Ok(ListKeysResponse(res)) + } } -pub async fn handle_create_key( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?; - - let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); - garage.key_table.insert(&key).await?; +impl RequestHandler for GetKeyInfoRequest { + type Response = GetKeyInfoResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetKeyInfoResponse, Error> { + let key = match (self.id, self.search) { + (Some(id), None) => garage.key_helper().get_existing_key(&id).await?, + (None, Some(search)) => { + garage + .key_helper() + .get_existing_matching_key(&search) + .await? + } + _ => { + return Err(Error::bad_request( + "Either id or search must be provided (but not both)", + )); + } + }; - key_info_results(garage, key, true).await + Ok(key_info_results(garage, key, self.show_secret_key).await?) + } } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateKeyRequest { - name: Option<String>, -} +impl RequestHandler for CreateKeyRequest { + type Response = CreateKeyResponse; -pub async fn handle_import_key( - garage: &Arc<Garage>, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CreateKeyResponse, Error> { + let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key")); + garage.key_table.insert(&key).await?; - let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; - if prev_key.is_some() { - return Err(Error::KeyAlreadyExists(req.access_key_id.to_string())); + Ok(CreateKeyResponse( + key_info_results(garage, key, true).await?, + )) } +} - let imported_key = Key::import( - &req.access_key_id, - &req.secret_access_key, - req.name.as_deref().unwrap_or("Imported key"), - ) - .ok_or_bad_request("Invalid key format")?; - garage.key_table.insert(&imported_key).await?; +impl RequestHandler for ImportKeyRequest { + type Response = ImportKeyResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ImportKeyResponse, Error> { + let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?; + if prev_key.is_some() { + return Err(Error::KeyAlreadyExists(self.access_key_id.to_string())); + } - key_info_results(garage, imported_key, false).await -} + let imported_key = Key::import( + &self.access_key_id, + &self.secret_access_key, + self.name.as_deref().unwrap_or("Imported key"), + ) + .ok_or_bad_request("Invalid key format")?; + garage.key_table.insert(&imported_key).await?; -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ImportKeyRequest { - access_key_id: String, - secret_access_key: String, - name: Option<String>, + Ok(ImportKeyResponse( + key_info_results(garage, imported_key, false).await?, + )) + } } -pub async fn handle_update_key( - garage: &Arc<Garage>, - id: String, - req: Request<IncomingBody>, -) -> Result<Response<ResBody>, Error> { - let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?; +impl RequestHandler for UpdateKeyRequest { + type Response = UpdateKeyResponse; - let mut key = garage.key_helper().get_existing_key(&id).await?; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateKeyResponse, Error> { + let mut key = garage.key_helper().get_existing_key(&self.id).await?; - let key_state = key.state.as_option_mut().unwrap(); + let key_state = key.state.as_option_mut().unwrap(); - if let Some(new_name) = req.name { - key_state.name.update(new_name); - } - if let Some(allow) = req.allow { - if allow.create_bucket { - key_state.allow_create_bucket.update(true); + if let Some(new_name) = self.body.name { + key_state.name.update(new_name); } - } - if let Some(deny) = req.deny { - if deny.create_bucket { - key_state.allow_create_bucket.update(false); + if let Some(allow) = self.body.allow { + if allow.create_bucket { + key_state.allow_create_bucket.update(true); + } + } + if let Some(deny) = self.body.deny { + if deny.create_bucket { + key_state.allow_create_bucket.update(false); + } } - } - garage.key_table.insert(&key).await?; + garage.key_table.insert(&key).await?; - key_info_results(garage, key, false).await + Ok(UpdateKeyResponse( + key_info_results(garage, key, false).await?, + )) + } } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct UpdateKeyRequest { - name: Option<String>, - allow: Option<KeyPerm>, - deny: Option<KeyPerm>, -} +impl RequestHandler for DeleteKeyRequest { + type Response = DeleteKeyResponse; -pub async fn handle_delete_key( - garage: &Arc<Garage>, - id: String, -) -> Result<Response<ResBody>, Error> { - let helper = garage.locked_helper().await; + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DeleteKeyResponse, Error> { + let helper = garage.locked_helper().await; - let mut key = helper.key().get_existing_key(&id).await?; + let mut key = helper.key().get_existing_key(&self.id).await?; - helper.delete_key(&mut key).await?; + helper.delete_key(&mut key).await?; - Ok(Response::builder() - .status(StatusCode::NO_CONTENT) - .body(empty_body())?) + Ok(DeleteKeyResponse) + } } async fn key_info_results( garage: &Arc<Garage>, key: Key, show_secret: bool, -) -> Result<Response<ResBody>, Error> { +) -> Result<GetKeyInfoResponse, Error> { let mut relevant_buckets = HashMap::new(); let key_state = key.state.as_option().unwrap(); @@ -193,7 +188,7 @@ async fn key_info_results( } } - let res = GetKeyInfoResult { + let res = GetKeyInfoResponse { name: key_state.name.get().clone(), access_key_id: key.key_id.clone(), secret_access_key: if show_secret { @@ -208,7 +203,7 @@ async fn key_info_results( .into_values() .map(|bucket| { let state = bucket.state.as_option().unwrap(); - KeyInfoBucketResult { + KeyInfoBucketResponse { id: hex::encode(bucket.id), global_aliases: state .aliases @@ -238,43 +233,5 @@ async fn key_info_results( .collect::<Vec<_>>(), }; - Ok(json_ok_response(&res)?) -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct GetKeyInfoResult { - name: String, - access_key_id: String, - #[serde(skip_serializing_if = "is_default")] - secret_access_key: Option<String>, - permissions: KeyPerm, - buckets: Vec<KeyInfoBucketResult>, -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct KeyPerm { - #[serde(default)] - create_bucket: bool, -} - -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct KeyInfoBucketResult { - id: String, - global_aliases: Vec<String>, - local_aliases: Vec<String>, - permissions: ApiBucketKeyPerm, -} - -#[derive(Serialize, Deserialize, Default)] -#[serde(rename_all = "camelCase")] -pub(crate) struct ApiBucketKeyPerm { - #[serde(default)] - pub(crate) read: bool, - #[serde(default)] - pub(crate) write: bool, - #[serde(default)] - pub(crate) owner: bool, + Ok(res) } diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index 599e9b44..dd9b7ffd 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -3,9 +3,41 @@ extern crate tracing; pub mod api_server; mod error; +mod macros; + +pub mod api; mod router_v0; mod router_v1; +mod router_v2; mod bucket; mod cluster; mod key; +mod special; + +mod block; +mod node; +mod repair; +mod worker; + +use std::sync::Arc; + +use garage_model::garage::Garage; + +pub use api_server::AdminApiServer as Admin; + +pub enum Authorization { + None, + MetricsToken, + AdminToken, +} + +pub trait RequestHandler { + type Response; + + fn handle( + self, + garage: &Arc<Garage>, + admin: &Admin, + ) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send; +} diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs new file mode 100644 index 00000000..df2762fe --- /dev/null +++ b/src/api/admin/macros.rs @@ -0,0 +1,219 @@ +macro_rules! admin_endpoints { + [ + $(@special $special_endpoint:ident,)* + $($endpoint:ident,)* + ] => { + paste! { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum AdminApiRequest { + $( + $special_endpoint( [<$special_endpoint Request>] ), + )* + $( + $endpoint( [<$endpoint Request>] ), + )* + } + + #[derive(Debug, Clone, Serialize)] + #[serde(untagged)] + pub enum AdminApiResponse { + $( + $endpoint( [<$endpoint Response>] ), + )* + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum TaggedAdminApiResponse { + $( + $endpoint( [<$endpoint Response>] ), + )* + } + + impl AdminApiRequest { + pub fn name(&self) -> &'static str { + match self { + $( + Self::$special_endpoint(_) => stringify!($special_endpoint), + )* + $( + Self::$endpoint(_) => stringify!($endpoint), + )* + } + } + } + + impl AdminApiResponse { + pub fn tagged(self) -> TaggedAdminApiResponse { + match self { + $( + Self::$endpoint(res) => TaggedAdminApiResponse::$endpoint(res), + )* + } + } + } + + $( + impl From< [< $endpoint Request >] > for AdminApiRequest { + fn from(req: [< $endpoint Request >]) -> AdminApiRequest { + AdminApiRequest::$endpoint(req) + } + } + + impl TryFrom<TaggedAdminApiResponse> for [< $endpoint Response >] { + type Error = TaggedAdminApiResponse; + fn try_from(resp: TaggedAdminApiResponse) -> Result< [< $endpoint Response >], TaggedAdminApiResponse> { + match resp { + TaggedAdminApiResponse::$endpoint(v) => Ok(v), + x => Err(x), + } + } + } + )* + + impl RequestHandler for AdminApiRequest { + type Response = AdminApiResponse; + + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> { + Ok(match self { + $( + AdminApiRequest::$special_endpoint(_) => panic!( + concat!(stringify!($special_endpoint), " needs to go through a special handler") + ), + )* + $( + AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?), + )* + }) + } + } + } + }; +} + +macro_rules! local_admin_endpoints { + [ + $($endpoint:ident,)* + ] => { + paste! { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiRequest { + $( + $endpoint( [<Local $endpoint Request>] ), + )* + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiResponse { + $( + $endpoint( [<Local $endpoint Response>] ), + )* + } + + $( + pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >; + + pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >]; + + pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >; + + impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest { + fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest { + LocalAdminApiRequest::$endpoint(req) + } + } + + impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] { + type Error = LocalAdminApiResponse; + fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> { + match resp { + LocalAdminApiResponse::$endpoint(v) => Ok(v), + x => Err(x), + } + } + } + + impl RequestHandler for [< $endpoint Request >] { + type Response = [< $endpoint Response >]; + + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> { + let to = match self.node.as_str() { + "*" => garage.system.cluster_layout().all_nodes().to_vec(), + id => { + let nodes = garage.system.cluster_layout().all_nodes() + .iter() + .filter(|x| hex::encode(x).starts_with(id)) + .cloned() + .collect::<Vec<_>>(); + if nodes.len() != 1 { + return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes))); + } + nodes + } + }; + + let resps = garage.system.rpc_helper().call_many(&admin.endpoint, + &to, + AdminRpc::Internal(self.body.into()), + RequestStrategy::with_priority(PRIO_NORMAL), + ).await?; + + let mut ret = [< $endpoint Response >] { + success: HashMap::new(), + error: HashMap::new(), + }; + for (node, resp) in resps { + match resp { + Ok(AdminRpcResponse::InternalApiOkResponse(r)) => { + match [< Local $endpoint Response >]::try_from(r) { + Ok(r) => { + ret.success.insert(hex::encode(node), r); + } + Err(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + } + } + Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => { + ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message)); + } + Ok(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + Err(e) => { + ret.error.insert(hex::encode(node), e.to_string()); + } + } + } + + Ok(ret) + } + } + )* + + impl LocalAdminApiRequest { + pub fn name(&self) -> &'static str { + match self { + $( + Self::$endpoint(_) => stringify!($endpoint), + )* + } + } + } + + impl RequestHandler for LocalAdminApiRequest { + type Response = LocalAdminApiResponse; + + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> { + Ok(match self { + $( + LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?), + )* + }) + } + } + } + }; +} + +pub(crate) use admin_endpoints; +pub(crate) use local_admin_endpoints; diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs new file mode 100644 index 00000000..f6f43d95 --- /dev/null +++ b/src/api/admin/node.rs @@ -0,0 +1,216 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; + +use format_table::format_table_to_string; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::layout::PARTITION_BITS; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalCreateMetadataSnapshotRequest { + type Response = LocalCreateMetadataSnapshotResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalCreateMetadataSnapshotResponse, Error> { + garage_model::snapshot::async_snapshot_metadata(garage).await?; + Ok(LocalCreateMetadataSnapshotResponse) + } +} + +impl RequestHandler for LocalGetNodeStatisticsRequest { + type Response = LocalGetNodeStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetNodeStatisticsResponse, Error> { + let mut ret = String::new(); + writeln!( + &mut ret, + "Garage version: {} [features: {}]\nRust compiler version: {}", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), + ) + .unwrap(); + + writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap(); + + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(gather_table_stats(&garage.bucket_table)?); + table.push(gather_table_stats(&garage.key_table)?); + table.push(gather_table_stats(&garage.object_table)?); + table.push(gather_table_stats(&garage.version_table)?); + table.push(gather_table_stats(&garage.block_ref_table)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + // Gather block manager statistics + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + let rc_len = garage.block_manager.rc_len()?.to_string(); + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); + writeln!( + &mut ret, + " resync queue length: {}", + garage.block_manager.resync.queue_len()? + ) + .unwrap(); + writeln!( + &mut ret, + " blocks with resync errors: {}", + garage.block_manager.resync.errors_len()? + ) + .unwrap(); + + Ok(LocalGetNodeStatisticsResponse { freeform: ret }) + } +} + +impl RequestHandler for GetClusterStatisticsRequest { + type Response = GetClusterStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterStatisticsResponse, Error> { + let mut ret = String::new(); + + // Gather storage node and free space statistics for current nodes + let layout = &garage.system.cluster_layout(); + let mut node_partition_count = HashMap::<Uuid, u64>::new(); + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<_, _>>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + Ok(GetClusterStatisticsResponse { freeform: ret }) + } +} + +fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); + let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); + + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) +} diff --git a/src/api/admin/repair.rs b/src/api/admin/repair.rs new file mode 100644 index 00000000..a9b8c36a --- /dev/null +++ b/src/api/admin/repair.rs @@ -0,0 +1,403 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::migrate::Migrate; + +use garage_table::replication::*; +use garage_table::*; + +use garage_block::manager::BlockManager; +use garage_block::repair::ScrubWorkerCommand; + +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +const RC_REPAIR_ITER_COUNT: usize = 64; + +impl RequestHandler for LocalLaunchRepairOperationRequest { + type Response = LocalLaunchRepairOperationResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalLaunchRepairOperationResponse, Error> { + let bg = &admin.background; + match self.repair_type { + RepairType::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; + } + RepairType::Versions => { + info!("Repairing the versions table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairType::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } + RepairType::BlockRefs => { + info!("Repairing the block refs table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); + } + RepairType::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } + RepairType::Blocks => { + info!("Repairing the stored blocks"); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairType::Scrub(cmd) => { + let cmd = match cmd { + ScrubCommand::Start => ScrubWorkerCommand::Start, + ScrubCommand::Pause => { + ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)) + } + ScrubCommand::Resume => ScrubWorkerCommand::Resume, + ScrubCommand::Cancel => ScrubWorkerCommand::Cancel, + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await?; + } + RepairType::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } + } + Ok(LocalLaunchRepairOperationResponse) + } +} + +// ---- + +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> impl Future<Output = Result<bool, GarageError>> + Send; +} + +struct TableRepairWorker<T: TableRepair> { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, + repairs: usize, + inner: T, +} + +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { + Self { + garage, + inner, + pos: vec![], + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl<R: TableRepair> Worker for TableRepairWorker<R> { + fn name(&self) -> String { + format!("{} repair worker", R::T::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> { + if !version.deleted.get() { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(bucket_id, key) + .await? + .map(|o| { + o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), + }; + + if !ref_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairBlockRefs; + +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table + } + + async fn process( + &mut self, + garage: &Garage, + mut block_ref: BlockRef, + ) -> Result<bool, GarageError> { + if !block_ref.deleted.get() { + let ref_exists = garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table + } + + async fn process( + &mut self, + garage: &Garage, + mut mpu: MultipartUpload, + ) -> Result<bool, GarageError> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ===== block reference counter repair ===== + +pub struct BlockRcRepair { + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + cursor: Hash, + counter: u64, + repairs: u64, +} + +impl BlockRcRepair { + fn new( + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + ) -> Self { + Self { + block_manager, + block_ref_table, + cursor: [0u8; 32].into(), + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl Worker for BlockRcRepair { + fn name(&self) -> String { + format!("Block refcount repair worker") + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + for _i in 0..RC_REPAIR_ITER_COUNT { + let next1 = self + .block_manager + .rc + .rc_table + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); + let next2 = self + .block_ref_table + .data + .store + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); + let next = match (next1, next2) { + (Some(k1), Some(k2)) => std::cmp::min(k1, k2), + (Some(k), None) | (None, Some(k)) => k, + (None, None) => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + if self.block_manager.rc.recalculate_rc(&next)?.1 { + self.repairs += 1; + } + self.counter += 1; + if let Some(next_incr) = next.increment() { + self.cursor = next_incr; + } else { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + } + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs index 0b4901ea..138a801d 100644 --- a/src/api/admin/router_v1.rs +++ b/src/api/admin/router_v1.rs @@ -7,12 +7,6 @@ use garage_api_common::router_macros::*; use crate::error::*; use crate::router_v0; -pub enum Authorization { - None, - MetricsToken, - AdminToken, -} - router_match! {@func /// List of all Admin API endpoints. @@ -211,15 +205,6 @@ impl Endpoint { ))), } } - /// Get the kind of authorization which is required to perform the operation. - pub fn authorization_type(&self) -> Authorization { - match self { - Self::Health => Authorization::None, - Self::CheckDomain => Authorization::None, - Self::Metrics => Authorization::MetricsToken, - _ => Authorization::AdminToken, - } - } } generateQueryParameters! { diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs new file mode 100644 index 00000000..4d5c015e --- /dev/null +++ b/src/api/admin/router_v2.rs @@ -0,0 +1,268 @@ +use std::borrow::Cow; + +use hyper::body::Incoming as IncomingBody; +use hyper::{Method, Request}; +use paste::paste; + +use garage_api_common::helpers::*; +use garage_api_common::router_macros::*; + +use crate::api::*; +use crate::error::*; +use crate::router_v1; +use crate::Authorization; + +impl AdminApiRequest { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub async fn from_request(req: Request<IncomingBody>) -> Result<Self, Error> { + let uri = req.uri().clone(); + let path = uri.path(); + let query = uri.query(); + + let method = req.method().clone(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let res = router_match!(@gen_path_parser_v2 (&method, path, "/v2/", query, req) [ + @special OPTIONS _ => Options (), + @special GET "/check" => CheckDomain (query::domain), + @special GET "/health" => Health (), + @special GET "/metrics" => Metrics (), + // Cluster endpoints + GET GetClusterStatus (), + GET GetClusterHealth (), + POST ConnectClusterNodes (body), + // Layout endpoints + GET GetClusterLayout (), + POST UpdateClusterLayout (body), + POST ApplyClusterLayout (body), + POST RevertClusterLayout (), + // API key endpoints + GET GetKeyInfo (query_opt::id, query_opt::search, parse_default(false)::show_secret_key), + POST UpdateKey (body_field, query::id), + POST CreateKey (body), + POST ImportKey (body), + POST DeleteKey (query::id), + GET ListKeys (), + // Bucket endpoints + GET GetBucketInfo (query_opt::id, query_opt::global_alias, query_opt::search), + GET ListBuckets (), + POST CreateBucket (body), + POST DeleteBucket (query::id), + POST UpdateBucket (body_field, query::id), + POST CleanupIncompleteUploads (body), + // Bucket-key permissions + POST AllowBucketKey (body), + POST DenyBucketKey (body), + // Bucket aliases + POST AddBucketAlias (body), + POST RemoveBucketAlias (body), + // Node APIs + POST CreateMetadataSnapshot (default::body, query::node), + GET GetNodeStatistics (default::body, query::node), + GET GetClusterStatistics (), + POST LaunchRepairOperation (body_field, query::node), + // Worker APIs + POST ListWorkers (body_field, query::node), + POST GetWorkerInfo (body_field, query::node), + POST GetWorkerVariable (body_field, query::node), + POST SetWorkerVariable (body_field, query::node), + // Block APIs + GET ListBlockErrors (default::body, query::node), + POST GetBlockInfo (body_field, query::node), + POST RetryBlockResync (body_field, query::node), + POST PurgeBlocks (body_field, query::node), + ]); + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + + Ok(res) + } + + /// Some endpoints work exactly the same in their v2/ version as they did in their v1/ version. + /// For these endpoints, we can convert a v1/ call to its equivalent as if it was made using + /// its v2/ URL. + pub async fn from_v1( + v1_endpoint: router_v1::Endpoint, + req: Request<IncomingBody>, + ) -> Result<Self, Error> { + use router_v1::Endpoint; + + match v1_endpoint { + Endpoint::GetClusterStatus => { + Ok(AdminApiRequest::GetClusterStatus(GetClusterStatusRequest)) + } + Endpoint::GetClusterHealth => { + Ok(AdminApiRequest::GetClusterHealth(GetClusterHealthRequest)) + } + Endpoint::ConnectClusterNodes => { + let req = parse_json_body::<ConnectClusterNodesRequest, _, Error>(req).await?; + Ok(AdminApiRequest::ConnectClusterNodes(req)) + } + + // Layout + Endpoint::GetClusterLayout => { + Ok(AdminApiRequest::GetClusterLayout(GetClusterLayoutRequest)) + } + Endpoint::UpdateClusterLayout => { + let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; + Ok(AdminApiRequest::UpdateClusterLayout(updates)) + } + Endpoint::ApplyClusterLayout => { + let param = parse_json_body::<ApplyClusterLayoutRequest, _, Error>(req).await?; + Ok(AdminApiRequest::ApplyClusterLayout(param)) + } + Endpoint::RevertClusterLayout => Ok(AdminApiRequest::RevertClusterLayout( + RevertClusterLayoutRequest, + )), + + // Keys + Endpoint::ListKeys => Ok(AdminApiRequest::ListKeys(ListKeysRequest)), + Endpoint::GetKeyInfo { + id, + search, + show_secret_key, + } => { + let show_secret_key = show_secret_key.map(|x| x == "true").unwrap_or(false); + Ok(AdminApiRequest::GetKeyInfo(GetKeyInfoRequest { + id, + search, + show_secret_key, + })) + } + Endpoint::CreateKey => { + let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?; + Ok(AdminApiRequest::CreateKey(req)) + } + Endpoint::ImportKey => { + let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?; + Ok(AdminApiRequest::ImportKey(req)) + } + Endpoint::UpdateKey { id } => { + let body = parse_json_body::<UpdateKeyRequestBody, _, Error>(req).await?; + Ok(AdminApiRequest::UpdateKey(UpdateKeyRequest { id, body })) + } + + // DeleteKey semantics changed: + // - in v1/ : HTTP DELETE => HTTP 204 No Content + // - in v2/ : HTTP POST => HTTP 200 Ok + // Endpoint::DeleteKey { id } => Ok(AdminApiRequest::DeleteKey(DeleteKeyRequest { id })), + + // Buckets + Endpoint::ListBuckets => Ok(AdminApiRequest::ListBuckets(ListBucketsRequest)), + Endpoint::GetBucketInfo { id, global_alias } => { + Ok(AdminApiRequest::GetBucketInfo(GetBucketInfoRequest { + id, + global_alias, + search: None, + })) + } + Endpoint::CreateBucket => { + let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?; + Ok(AdminApiRequest::CreateBucket(req)) + } + + // DeleteBucket semantics changed:: + // - in v1/ : HTTP DELETE => HTTP 204 No Content + // - in v2/ : HTTP POST => HTTP 200 Ok + // Endpoint::DeleteBucket { id } => { + // Ok(AdminApiRequest::DeleteBucket(DeleteBucketRequest { id })) + // } + Endpoint::UpdateBucket { id } => { + let body = parse_json_body::<UpdateBucketRequestBody, _, Error>(req).await?; + Ok(AdminApiRequest::UpdateBucket(UpdateBucketRequest { + id, + body, + })) + } + + // Bucket-key permissions + Endpoint::BucketAllowKey => { + let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?; + Ok(AdminApiRequest::AllowBucketKey(AllowBucketKeyRequest(req))) + } + Endpoint::BucketDenyKey => { + let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?; + Ok(AdminApiRequest::DenyBucketKey(DenyBucketKeyRequest(req))) + } + // Bucket aliasing + Endpoint::GlobalAliasBucket { id, alias } => { + Ok(AdminApiRequest::AddBucketAlias(AddBucketAliasRequest { + bucket_id: id, + alias: BucketAliasEnum::Global { + global_alias: alias, + }, + })) + } + Endpoint::GlobalUnaliasBucket { id, alias } => Ok(AdminApiRequest::RemoveBucketAlias( + RemoveBucketAliasRequest { + bucket_id: id, + alias: BucketAliasEnum::Global { + global_alias: alias, + }, + }, + )), + Endpoint::LocalAliasBucket { + id, + access_key_id, + alias, + } => Ok(AdminApiRequest::AddBucketAlias(AddBucketAliasRequest { + bucket_id: id, + alias: BucketAliasEnum::Local { + local_alias: alias, + access_key_id, + }, + })), + Endpoint::LocalUnaliasBucket { + id, + access_key_id, + alias, + } => Ok(AdminApiRequest::RemoveBucketAlias( + RemoveBucketAliasRequest { + bucket_id: id, + alias: BucketAliasEnum::Local { + local_alias: alias, + access_key_id, + }, + }, + )), + + // For endpoints that have different body content syntax, issue + // deprecation warning + _ => Err(Error::bad_request(format!( + "v1/ endpoint is no longer supported: {}", + v1_endpoint.name() + ))), + } + } + + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + match self { + Self::Options(_) => Authorization::None, + Self::Health(_) => Authorization::None, + Self::CheckDomain(_) => Authorization::None, + Self::Metrics(_) => Authorization::MetricsToken, + _ => Authorization::AdminToken, + } + } +} + +generateQueryParameters! { + keywords: [], + fields: [ + "node" => node, + "domain" => domain, + "format" => format, + "id" => id, + "search" => search, + "globalAlias" => global_alias, + "alias" => alias, + "accessKeyId" => access_key_id, + "showSecretKey" => show_secret_key + ] +} diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs new file mode 100644 index 00000000..0ecf82bc --- /dev/null +++ b/src/api/admin/special.rs @@ -0,0 +1,179 @@ +use std::sync::Arc; + +use http::header::{ + ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, +}; +use hyper::{Response, StatusCode}; + +#[cfg(feature = "metrics")] +use prometheus::{Encoder, TextEncoder}; + +use garage_model::garage::Garage; +use garage_rpc::system::ClusterHealthStatus; + +use garage_api_common::helpers::*; + +use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest}; +use crate::api_server::ResBody; +use crate::error::*; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for OptionsRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + _garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + Ok(Response::builder() + .status(StatusCode::OK) + .header(ALLOW, "OPTIONS,GET,POST") + .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS,GET,POST") + .header(ACCESS_CONTROL_ALLOW_HEADERS, "authorization,content-type") + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .body(empty_body())?) + } +} + +impl RequestHandler for MetricsRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + #[cfg(feature = "metrics")] + { + use opentelemetry::trace::Tracer; + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + admin.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(bytes_body(buffer.into()))?) + } + #[cfg(not(feature = "metrics"))] + Err(Error::bad_request( + "Garage was built without the metrics feature".to_string(), + )) + } +} + +impl RequestHandler for HealthRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + let health = garage.system.health(); + + let (status, status_str) = match health.status { + ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), + ClusterHealthStatus::Degraded => ( + StatusCode::OK, + "Garage is operational but some storage nodes are unavailable", + ), + ClusterHealthStatus::Unavailable => ( + StatusCode::SERVICE_UNAVAILABLE, + "Quorum is not available for some/all partitions, reads and writes will fail", + ), + }; + let status_str = format!( + "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n", + status_str + ); + + Ok(Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(string_body(status_str))?) + } +} + +impl RequestHandler for CheckDomainRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + if check_domain(garage, &self.domain).await? { + Ok(Response::builder() + .status(StatusCode::OK) + .body(string_body(format!( + "Domain '{}' is managed by Garage", + self.domain + )))?) + } else { + Err(Error::bad_request(format!( + "Domain '{}' is not managed by Garage", + self.domain + ))) + } + } +} + +async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error> { + // Resolve bucket from domain name, inferring if the website must be activated for the + // domain to be valid. + let (bucket_name, must_check_website) = if let Some(bname) = garage + .config + .s3_api + .root_domain + .as_ref() + .and_then(|rd| host_to_bucket(domain, rd)) + { + (bname.to_string(), false) + } else if let Some(bname) = garage + .config + .s3_web + .as_ref() + .and_then(|sw| host_to_bucket(domain, sw.root_domain.as_str())) + { + (bname.to_string(), true) + } else { + (domain.to_string(), true) + }; + + let bucket_id = match garage + .bucket_helper() + .resolve_global_bucket_name(&bucket_name) + .await? + { + Some(bucket_id) => bucket_id, + None => return Ok(false), + }; + + if !must_check_website { + return Ok(true); + } + + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let bucket_state = bucket.state.as_option().unwrap(); + let bucket_website_config = bucket_state.website_config.get(); + + match bucket_website_config { + Some(_v) => Ok(true), + None => Ok(false), + } +} diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs new file mode 100644 index 00000000..b3f4537b --- /dev/null +++ b/src/api/admin/worker.rs @@ -0,0 +1,118 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::time::now_msec; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListWorkersRequest { + type Response = LocalListWorkersResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalListWorkersResponse, Error> { + let workers = admin.background.get_worker_info(); + let info = workers + .into_iter() + .filter(|(_, w)| { + (!self.busy_only + || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_))) + && (!self.error_only || w.errors > 0) + }) + .map(|(id, w)| worker_info_to_api(id as u64, w)) + .collect::<Vec<_>>(); + Ok(LocalListWorkersResponse(info)) + } +} + +impl RequestHandler for LocalGetWorkerInfoRequest { + type Response = LocalGetWorkerInfoResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalGetWorkerInfoResponse, Error> { + let info = admin + .background + .get_worker_info() + .get(&(self.id as usize)) + .ok_or(Error::NoSuchWorker(self.id))? + .clone(); + Ok(LocalGetWorkerInfoResponse(worker_info_to_api( + self.id, info, + ))) + } +} + +impl RequestHandler for LocalGetWorkerVariableRequest { + type Response = LocalGetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetWorkerVariableResponse, Error> { + let mut res = HashMap::new(); + if let Some(k) = self.variable { + res.insert(k.clone(), garage.bg_vars.get(&k)?); + } else { + let vars = garage.bg_vars.get_all(); + for (k, v) in vars.iter() { + res.insert(k.to_string(), v.to_string()); + } + } + Ok(LocalGetWorkerVariableResponse(res)) + } +} + +impl RequestHandler for LocalSetWorkerVariableRequest { + type Response = LocalSetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalSetWorkerVariableResponse, Error> { + garage.bg_vars.set(&self.variable, &self.value)?; + + Ok(LocalSetWorkerVariableResponse { + variable: self.variable, + value: self.value, + }) + } +} + +// ---- helper functions ---- + +fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { + WorkerInfoResp { + id, + name: info.name, + state: match info.state { + WorkerState::Busy => WorkerStateResp::Busy, + WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t }, + WorkerState::Idle => WorkerStateResp::Idle, + WorkerState::Done => WorkerStateResp::Done, + }, + errors: info.errors as u64, + consecutive_errors: info.consecutive_errors as u64, + last_error: info.last_error.map(|(message, t)| WorkerLastError { + message, + secs_ago: now_msec().saturating_sub(t) / 1000, + }), + + tranquility: info.status.tranquility, + progress: info.status.progress, + queue_length: info.status.queue_length, + persistent_errors: info.status.persistent_errors, + freeform: info.status.freeform, + } +} diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs index 6ddc2ff2..d7ee5692 100644 --- a/src/api/common/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::convert::Infallible; use std::fs::{self, Permissions}; use std::os::unix::fs::PermissionsExt; @@ -35,7 +36,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress; use crate::helpers::{BoxBody, ErrorBody}; pub trait ApiEndpoint: Send + Sync + 'static { - fn name(&self) -> &'static str; + fn name(&self) -> Cow<'static, str>; fn add_span_attributes(&self, span: SpanRef<'_>); } diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs index d9fe86db..f4a93c67 100644 --- a/src/api/common/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -45,6 +45,68 @@ macro_rules! router_match { } } }}; + (@gen_path_parser_v2 ($method:expr, $reqpath:expr, $pathprefix:literal, $query:expr, $req:expr) + [ + $(@special $spec_meth:ident $spec_path:pat => $spec_api:ident $spec_params:tt,)* + $($meth:ident $api:ident $params:tt,)* + ]) => {{ + { + #[allow(unused_parens)] + match ($method, $reqpath) { + $( + (&Method::$spec_meth, $spec_path) => AdminApiRequest::$spec_api ( + router_match!(@@gen_parse_request $spec_api, $spec_params, $query, $req) + ), + )* + $( + (&Method::$meth, concat!($pathprefix, stringify!($api))) + => AdminApiRequest::$api ( + router_match!(@@gen_parse_request $api, $params, $query, $req) + ), + )* + (m, p) => { + return Err(Error::bad_request(format!( + "Unknown API endpoint: {} {}", + m, p + ))) + } + } + } + }}; + (@@gen_parse_request $api:ident, (), $query: expr, $req:expr) => {{ + paste!( + [< $api Request >] + ) + }}; + (@@gen_parse_request $api:ident, (body), $query: expr, $req:expr) => {{ + paste!({ + parse_json_body::< [<$api Request>], _, Error>($req).await? + }) + }}; + (@@gen_parse_request $api:ident, (body_field, $($conv:ident $(($conv_arg:expr))? :: $param:ident),*), $query: expr, $req:expr) + => + {{ + paste!({ + let body = parse_json_body::< [<$api RequestBody>], _, Error>($req).await?; + [< $api Request >] { + body, + $( + $param: router_match!(@@parse_param $query, $conv $(($conv_arg))?, $param), + )+ + } + }) + }}; + (@@gen_parse_request $api:ident, ($($conv:ident $(($conv_arg:expr))? :: $param:ident),*), $query: expr, $req:expr) + => + {{ + paste!({ + [< $api Request >] { + $( + $param: router_match!(@@parse_param $query, $conv $(($conv_arg))?, $param), + )+ + } + }) + }}; (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr), key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ @@ -79,13 +141,19 @@ macro_rules! router_match { } }}; + (@@parse_param $query:expr, default, $param:ident) => {{ + Default::default() + }}; (@@parse_param $query:expr, query_opt, $param:ident) => {{ // extract optional query parameter $query.$param.take().map(|param| param.into_owned()) }}; (@@parse_param $query:expr, query, $param:ident) => {{ // extract mendatory query parameter - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")?.into_owned() + $query.$param.take() + .ok_or_bad_request( + format!("Missing argument `{}` for endpoint", stringify!($param)) + )?.into_owned() }}; (@@parse_param $query:expr, opt_parse, $param:ident) => {{ // extract and parse optional query parameter @@ -99,10 +167,22 @@ macro_rules! router_match { (@@parse_param $query:expr, parse, $param:ident) => {{ // extract and parse mandatory query parameter // both missing and un-parseable parameters are reported as errors - $query.$param.take().ok_or_bad_request("Missing argument for endpoint")? + $query.$param.take() + .ok_or_bad_request( + format!("Missing argument `{}` for endpoint", stringify!($param)) + )? .parse() .map_err(|_| Error::bad_request("Failed to parse query parameter"))? }}; + (@@parse_param $query:expr, parse_default($default:expr), $param:ident) => {{ + // extract and parse optional query parameter + // using provided value as default if paramter is missing + $query.$param.take().map(|x| x + .parse() + .map_err(|_| Error::bad_request("Failed to parse query parameter"))) + .transpose()? + .unwrap_or($default) + }}; (@func $(#[$doc:meta])* pub enum Endpoint { @@ -187,6 +267,7 @@ macro_rules! generateQueryParameters { }, )* $( + // FIXME: remove if !v.is_empty() ? $f_param => if !v.is_empty() { if res.$f_name.replace(v).is_some() { return Err(Error::bad_request(format!( diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb276f5b..015fd687 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::Arc; use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; @@ -177,8 +178,8 @@ impl ApiHandler for K2VApiServer { } impl ApiEndpoint for K2VApiEndpoint { - fn name(&self) -> &'static str { - self.endpoint.name() + fn name(&self) -> Cow<'static, str> { + Cow::Borrowed(self.endpoint.name()) } fn add_span_attributes(&self, span: SpanRef<'_>) { diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index bf48bba1..c8c28f3d 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::Arc; use hyper::header; @@ -353,8 +354,8 @@ impl ApiHandler for S3ApiServer { } impl ApiEndpoint for S3ApiEndpoint { - fn name(&self) -> &'static str { - self.endpoint.name() + fn name(&self) -> Cow<'static, str> { + Cow::Borrowed(self.endpoint.name()) } fn add_span_attributes(&self, span: SpanRef<'_>) { |