diff options
Diffstat (limited to 'src/api/admin')
-rw-r--r-- | src/api/admin/Cargo.toml | 3 | ||||
-rw-r--r-- | src/api/admin/api.rs | 312 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 196 | ||||
-rw-r--r-- | src/api/admin/block.rs | 274 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 109 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 67 | ||||
-rw-r--r-- | src/api/admin/error.rs | 14 | ||||
-rw-r--r-- | src/api/admin/key.rs | 54 | ||||
-rw-r--r-- | src/api/admin/lib.rs | 18 | ||||
-rw-r--r-- | src/api/admin/macros.rs | 133 | ||||
-rw-r--r-- | src/api/admin/node.rs | 216 | ||||
-rw-r--r-- | src/api/admin/repair.rs | 403 | ||||
-rw-r--r-- | src/api/admin/router_v2.rs | 17 | ||||
-rw-r--r-- | src/api/admin/special.rs | 126 | ||||
-rw-r--r-- | src/api/admin/worker.rs | 118 |
15 files changed, 1861 insertions, 199 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index 94a321a6..9ac099e8 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -14,7 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_model.workspace = true +garage_block.workspace = true garage_table.workspace = true garage_util.workspace = true garage_rpc.workspace = true @@ -22,6 +24,7 @@ garage_api_common.workspace = true argon2.workspace = true async-trait.workspace = true +bytesize.workspace = true err-derive.workspace = true hex.workspace = true paste.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 99832564..97cde158 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -1,18 +1,22 @@ +use std::collections::HashMap; use std::convert::TryFrom; use std::net::SocketAddr; use std::sync::Arc; -use async_trait::async_trait; use paste::paste; use serde::{Deserialize, Serialize}; +use garage_rpc::*; + use garage_model::garage::Garage; +use garage_api_common::common_error::CommonErrorDerivative; use garage_api_common::helpers::is_default; +use crate::api_server::{AdminRpc, AdminRpcResponse}; use crate::error::Error; use crate::macros::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; // This generates the following: // @@ -62,6 +66,7 @@ admin_endpoints![ CreateBucket, UpdateBucket, DeleteBucket, + CleanupIncompleteUploads, // Operations on permissions for keys on buckets AllowBucketKey, @@ -70,8 +75,55 @@ admin_endpoints![ // Operations on bucket aliases AddBucketAlias, RemoveBucketAlias, + + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + GetClusterStatistics, + LaunchRepairOperation, + + // Worker operations + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, ]; +local_admin_endpoints![ + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + LaunchRepairOperation, + // Background workers + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiRequest<RB> { + pub node: String, + pub body: RB, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiResponse<RB> { + pub success: HashMap<String, RB>, + pub error: HashMap<String, String>, +} + // ********************************************** // Special endpoints // @@ -497,6 +549,19 @@ pub struct DeleteBucketRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeleteBucketResponse; +// ---- CleanupIncompleteUploads ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsRequest { + pub bucket_id: String, + pub older_than_secs: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsResponse { + pub uploads_deleted: u64, +} + // ********************************************** // Operations on permissions for keys on buckets // ********************************************** @@ -566,3 +631,246 @@ pub struct RemoveBucketAliasRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); + +// ********************************************** +// Node operations +// ********************************************** + +// ---- CreateMetadataSnapshot ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalCreateMetadataSnapshotRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalCreateMetadataSnapshotResponse; + +// ---- GetNodeStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalGetNodeStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetNodeStatisticsResponse { + pub freeform: String, +} + +// ---- GetClusterStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GetClusterStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterStatisticsResponse { + pub freeform: String, +} + +// ---- LaunchRepairOperation ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationRequest { + pub repair_type: RepairType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RepairType { + Tables, + Blocks, + Versions, + MultipartUploads, + BlockRefs, + BlockRc, + Rebalance, + Scrub(ScrubCommand), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrubCommand { + Start, + Pause, + Resume, + Cancel, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationResponse; + +// ********************************************** +// Worker operations +// ********************************************** + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct LocalListWorkersRequest { + #[serde(default)] + pub busy_only: bool, + #[serde(default)] + pub error_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerInfoResp { + pub id: u64, + pub name: String, + pub state: WorkerStateResp, + pub errors: u64, + pub consecutive_errors: u64, + pub last_error: Option<WorkerLastError>, + pub tranquility: Option<u32>, + pub progress: Option<String>, + pub queue_length: Option<u64>, + pub persistent_errors: Option<u64>, + pub freeform: Vec<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WorkerStateResp { + Busy, + Throttled { duration_secs: f32 }, + Idle, + Done, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerLastError { + pub message: String, + pub secs_ago: u64, +} + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoRequest { + pub id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp); + +// ---- GetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableRequest { + pub variable: Option<String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>); + +// ---- SetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableRequest { + pub variable: String, + pub value: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableResponse { + pub variable: String, + pub value: String, +} + +// ********************************************** +// Block operations +// ********************************************** + +// ---- ListBlockErrors ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListBlockErrorsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>); + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlockError { + pub block_hash: String, + pub refcount: u64, + pub error_count: u64, + pub last_try_secs_ago: u64, + pub next_try_in_secs: u64, +} + +// ---- GetBlockInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoRequest { + pub block_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoResponse { + pub block_hash: String, + pub refcount: u64, + pub versions: Vec<BlockVersion>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockVersion { + pub version_id: String, + pub deleted: bool, + pub garbage_collected: bool, + pub backlink: Option<BlockVersionBacklink>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockVersionBacklink { + Object { + bucket_id: String, + key: String, + }, + Upload { + upload_id: String, + upload_deleted: bool, + upload_garbage_collected: bool, + bucket_id: Option<String>, + key: Option<String>, + }, +} + +// ---- RetryBlockResync ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum LocalRetryBlockResyncRequest { + #[serde(rename_all = "camelCase")] + All { all: bool }, + #[serde(rename_all = "camelCase")] + Blocks { block_hashes: Vec<String> }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalRetryBlockResyncResponse { + pub count: u64, +} + +// ---- PurgeBlocks ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksRequest(pub Vec<String>); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksResponse { + pub blocks_purged: u64, + pub objects_deleted: u64, + pub uploads_deleted: u64, + pub versions_deleted: u64, +} diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index be29e617..37574dcf 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,20 +2,20 @@ use std::borrow::Cow; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION}; -use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; use opentelemetry::trace::SpanRef; #[cfg(feature = "metrics")] use opentelemetry_prometheus::PrometheusExporter; -#[cfg(feature = "metrics")] -use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; +use garage_rpc::{Endpoint as RpcEndpoint, *}; +use garage_util::background::BackgroundRunner; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; @@ -27,19 +27,83 @@ use crate::error::*; use crate::router_v0; use crate::router_v1; use crate::Authorization; -use crate::EndpointHandler; +use crate::RequestHandler; + +// ---- FOR RPC ---- + +pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpc { + Proxy(AdminApiRequest), + Internal(LocalAdminApiRequest), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpcResponse { + ProxyApiOkResponse(TaggedAdminApiResponse), + InternalApiOkResponse(LocalAdminApiResponse), + ApiErrorResponse { + http_code: u16, + error_code: String, + message: String, + }, +} + +impl Rpc for AdminRpc { + type Response = Result<AdminRpcResponse, GarageError>; +} + +impl EndpointHandler<AdminRpc> for AdminApiServer { + async fn handle( + self: &Arc<Self>, + message: &AdminRpc, + _from: NodeID, + ) -> Result<AdminRpcResponse, GarageError> { + match message { + AdminRpc::Proxy(req) => { + info!("Proxied admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + AdminRpc::Internal(req) => { + info!("Internal admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + } + } +} + +// ---- FOR HTTP ---- pub type ResBody = BoxBody<Error>; pub struct AdminApiServer { garage: Arc<Garage>, #[cfg(feature = "metrics")] - exporter: PrometheusExporter, + pub(crate) exporter: PrometheusExporter, metrics_token: Option<String>, admin_token: Option<String>, + pub(crate) background: Arc<BackgroundRunner>, + pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>, } -pub enum Endpoint { +pub enum HttpEndpoint { Old(router_v1::Endpoint), New(String), } @@ -47,91 +111,48 @@ pub enum Endpoint { impl AdminApiServer { pub fn new( garage: Arc<Garage>, + background: Arc<BackgroundRunner>, #[cfg(feature = "metrics")] exporter: PrometheusExporter, - ) -> Self { + ) -> Arc<Self> { let cfg = &garage.config.admin; let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token); let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token); - Self { + + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, #[cfg(feature = "metrics")] exporter, metrics_token, admin_token, - } + background, + endpoint, + }); + admin.endpoint.set_handler(admin.clone()); + admin } pub async fn run( - self, + self: Arc<Self>, bind_addr: UnixOrTCPSocketAddress, must_exit: watch::Receiver<bool>, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); - ApiServer::new(region, self) + ApiServer::new(region, ArcAdminApiServer(self)) .run_server(bind_addr, Some(0o220), must_exit) .await } - fn handle_metrics(&self) -> Result<Response<ResBody>, Error> { - #[cfg(feature = "metrics")] - { - use opentelemetry::trace::Tracer; - - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - self.exporter.registry().gather() - }); - - encoder - .encode(&metric_families, &mut buffer) - .ok_or_internal_error("Could not serialize metrics")?; - - Ok(Response::builder() - .status(StatusCode::OK) - .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(bytes_body(buffer.into()))?) - } - #[cfg(not(feature = "metrics"))] - Err(Error::bad_request( - "Garage was built without the metrics feature".to_string(), - )) - } -} - -#[async_trait] -impl ApiHandler for AdminApiServer { - const API_NAME: &'static str = "admin"; - const API_NAME_DISPLAY: &'static str = "Admin"; - - type Endpoint = Endpoint; - type Error = Error; - - fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> { - if req.uri().path().starts_with("/v0/") { - let endpoint_v0 = router_v0::Endpoint::from_request(req)?; - let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; - Ok(Endpoint::Old(endpoint_v1)) - } else if req.uri().path().starts_with("/v1/") { - let endpoint_v1 = router_v1::Endpoint::from_request(req)?; - Ok(Endpoint::Old(endpoint_v1)) - } else { - Ok(Endpoint::New(req.uri().path().to_string())) - } - } - - async fn handle( + async fn handle_http_api( &self, req: Request<IncomingBody>, - endpoint: Endpoint, + endpoint: HttpEndpoint, ) -> Result<Response<ResBody>, Error> { let auth_header = req.headers().get(AUTHORIZATION).cloned(); let request = match endpoint { - Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, - Endpoint::New(_) => AdminApiRequest::from_request(req).await?, + HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, + HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?, }; let required_auth_hash = @@ -156,12 +177,12 @@ impl ApiHandler for AdminApiServer { } match request { - AdminApiRequest::Options(req) => req.handle(&self.garage).await, - AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await, - AdminApiRequest::Health(req) => req.handle(&self.garage).await, - AdminApiRequest::Metrics(_req) => self.handle_metrics(), + AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await, req => { - let res = req.handle(&self.garage).await?; + let res = req.handle(&self.garage, &self).await?; let mut res = json_ok_response(&res)?; res.headers_mut() .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); @@ -171,7 +192,38 @@ impl ApiHandler for AdminApiServer { } } -impl ApiEndpoint for Endpoint { +struct ArcAdminApiServer(Arc<AdminApiServer>); + +impl ApiHandler for ArcAdminApiServer { + const API_NAME: &'static str = "admin"; + const API_NAME_DISPLAY: &'static str = "Admin"; + + type Endpoint = HttpEndpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> { + if req.uri().path().starts_with("/v0/") { + let endpoint_v0 = router_v0::Endpoint::from_request(req)?; + let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; + Ok(HttpEndpoint::Old(endpoint_v1)) + } else if req.uri().path().starts_with("/v1/") { + let endpoint_v1 = router_v1::Endpoint::from_request(req)?; + Ok(HttpEndpoint::Old(endpoint_v1)) + } else { + Ok(HttpEndpoint::New(req.uri().path().to_string())) + } + } + + async fn handle( + &self, + req: Request<IncomingBody>, + endpoint: HttpEndpoint, + ) -> Result<Response<ResBody>, Error> { + self.0.handle_http_api(req, endpoint).await + } +} + +impl ApiEndpoint for HttpEndpoint { fn name(&self) -> Cow<'static, str> { match self { Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()), diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs new file mode 100644 index 00000000..73d186a6 --- /dev/null +++ b/src/api/admin/block.rs @@ -0,0 +1,274 @@ +use std::sync::Arc; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::now_msec; + +use garage_table::EmptyKey; + +use garage_model::garage::Garage; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use garage_api_common::common_error::CommonErrorDerivative; + +use crate::api::*; +use crate::error::*; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListBlockErrorsRequest { + type Response = LocalListBlockErrorsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalListBlockErrorsResponse, Error> { + let errors = garage.block_manager.list_resync_errors()?; + let now = now_msec(); + let errors = errors + .into_iter() + .map(|e| BlockError { + block_hash: hex::encode(&e.hash), + refcount: e.refcount, + error_count: e.error_count, + last_try_secs_ago: now.saturating_sub(e.last_try) / 1000, + next_try_in_secs: e.next_try.saturating_sub(now) / 1000, + }) + .collect(); + Ok(LocalListBlockErrorsResponse(errors)) + } +} + +impl RequestHandler for LocalGetBlockInfoRequest { + type Response = LocalGetBlockInfoResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetBlockInfoResponse, Error> { + let hash = find_block_hash_by_prefix(garage, &self.block_hash)?; + let refcount = garage.block_manager.get_block_rc(&hash)?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? { + let bl = match &v.backlink { + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: u.deleted.get(), + upload_garbage_collected: false, + bucket_id: Some(hex::encode(&u.bucket_id)), + key: Some(u.key.to_string()), + } + } else { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: true, + upload_garbage_collected: true, + bucket_id: None, + key: None, + } + } + } + VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object { + bucket_id: hex::encode(&bucket_id), + key: key.to_string(), + }, + }; + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: v.deleted.get(), + garbage_collected: false, + backlink: Some(bl), + }); + } else { + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: true, + garbage_collected: true, + backlink: None, + }); + } + } + Ok(LocalGetBlockInfoResponse { + block_hash: hex::encode(&hash), + refcount, + versions, + }) + } +} + +impl RequestHandler for LocalRetryBlockResyncRequest { + type Response = LocalRetryBlockResyncResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalRetryBlockResyncResponse, Error> { + match self { + Self::All { all: true } => { + let blocks = garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: blocks.len() as u64, + }) + } + Self::All { all: false } => Err(Error::bad_request("nonsense")), + Self::Blocks { block_hashes } => { + for hash in block_hashes.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: block_hashes.len() as u64, + }) + } + } + } +} + +impl RequestHandler for LocalPurgeBlocksRequest { + type Response = LocalPurgeBlocksResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalPurgeBlocksResponse, Error> { + let mut obj_dels = 0; + let mut mpu_dels = 0; + let mut ver_dels = 0; + + for hash in self.0.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? { + handle_block_purge_version_backlink( + garage, + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; + + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } + + Ok(LocalPurgeBlocksResponse { + blocks_purged: self.0.len() as u64, + versions_deleted: ver_dels, + objects_deleted: obj_dels, + uploads_deleted: mpu_dels, + }) + } +} + +fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> { + if prefix.len() < 4 { + return Err(Error::bad_request( + "Please specify at least 4 characters of the block hash", + )); + } + + let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::bad_request(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string())) +} + +async fn handle_block_purge_version_backlink( + garage: &Arc<Garage>, + version: &Version, + obj_dels: &mut u64, + mpu_dels: &mut u64, +) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) +} diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 123956ca..d2bb62e0 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; - -use async_trait::async_trait; +use std::time::Duration; use garage_util::crdt::*; use garage_util::data::*; @@ -20,13 +19,16 @@ use garage_api_common::common_error::CommonError; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for ListBucketsRequest { +impl RequestHandler for ListBucketsRequest { type Response = ListBucketsResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ListBucketsResponse, Error> { let buckets = garage .bucket_table .get_range( @@ -69,11 +71,14 @@ impl EndpointHandler for ListBucketsRequest { } } -#[async_trait] -impl EndpointHandler for GetBucketInfoRequest { +impl RequestHandler for GetBucketInfoRequest { type Response = GetBucketInfoResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetBucketInfoResponse, Error> { let bucket_id = match (self.id, self.global_alias, self.search) { (Some(id), None, None) => parse_bucket_id(&id)?, (None, Some(ga), None) => garage @@ -221,11 +226,14 @@ async fn bucket_info_results( Ok(res) } -#[async_trait] -impl EndpointHandler for CreateBucketRequest { +impl RequestHandler for CreateBucketRequest { type Response = CreateBucketResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CreateBucketResponse, Error> { let helper = garage.locked_helper().await; if let Some(ga) = &self.global_alias { @@ -292,11 +300,14 @@ impl EndpointHandler for CreateBucketRequest { } } -#[async_trait] -impl EndpointHandler for DeleteBucketRequest { +impl RequestHandler for DeleteBucketRequest { type Response = DeleteBucketResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DeleteBucketResponse, Error> { let helper = garage.locked_helper().await; let bucket_id = parse_bucket_id(&self.id)?; @@ -341,11 +352,14 @@ impl EndpointHandler for DeleteBucketRequest { } } -#[async_trait] -impl EndpointHandler for UpdateBucketRequest { +impl RequestHandler for UpdateBucketRequest { type Response = UpdateBucketResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateBucketResponse, Error> { let bucket_id = parse_bucket_id(&self.id)?; let mut bucket = garage @@ -388,23 +402,52 @@ impl EndpointHandler for UpdateBucketRequest { } } +impl RequestHandler for CleanupIncompleteUploadsRequest { + type Response = CleanupIncompleteUploadsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CleanupIncompleteUploadsResponse, Error> { + let duration = Duration::from_secs(self.older_than_secs); + + let bucket_id = parse_bucket_id(&self.bucket_id)?; + + let count = garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket_id, duration) + .await?; + + Ok(CleanupIncompleteUploadsResponse { + uploads_deleted: count as u64, + }) + } +} + // ---- BUCKET/KEY PERMISSIONS ---- -#[async_trait] -impl EndpointHandler for AllowBucketKeyRequest { +impl RequestHandler for AllowBucketKeyRequest { type Response = AllowBucketKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<AllowBucketKeyResponse, Error> { let res = handle_bucket_change_key_perm(garage, self.0, true).await?; Ok(AllowBucketKeyResponse(res)) } } -#[async_trait] -impl EndpointHandler for DenyBucketKeyRequest { +impl RequestHandler for DenyBucketKeyRequest { type Response = DenyBucketKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DenyBucketKeyResponse, Error> { let res = handle_bucket_change_key_perm(garage, self.0, false).await?; Ok(DenyBucketKeyResponse(res)) } @@ -449,11 +492,14 @@ pub async fn handle_bucket_change_key_perm( // ---- BUCKET ALIASES ---- -#[async_trait] -impl EndpointHandler for AddBucketAliasRequest { +impl RequestHandler for AddBucketAliasRequest { type Response = AddBucketAliasResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<AddBucketAliasResponse, Error> { let bucket_id = parse_bucket_id(&self.bucket_id)?; let helper = garage.locked_helper().await; @@ -480,11 +526,14 @@ impl EndpointHandler for AddBucketAliasRequest { } } -#[async_trait] -impl EndpointHandler for RemoveBucketAliasRequest { +impl RequestHandler for RemoveBucketAliasRequest { type Response = RemoveBucketAliasResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<RemoveBucketAliasResponse, Error> { let bucket_id = parse_bucket_id(&self.bucket_id)?; let helper = garage.locked_helper().await; diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index dc16bd50..cb1fa493 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; - use garage_util::crdt::*; use garage_util::data::*; @@ -12,13 +10,16 @@ use garage_model::garage::Garage; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for GetClusterStatusRequest { +impl RequestHandler for GetClusterStatusRequest { type Response = GetClusterStatusResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterStatusResponse, Error> { let layout = garage.system.cluster_layout(); let mut nodes = garage .system @@ -116,11 +117,14 @@ impl EndpointHandler for GetClusterStatusRequest { } } -#[async_trait] -impl EndpointHandler for GetClusterHealthRequest { +impl RequestHandler for GetClusterHealthRequest { type Response = GetClusterHealthResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterHealthResponse, Error> { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = GetClusterHealthResponse { @@ -142,11 +146,14 @@ impl EndpointHandler for GetClusterHealthRequest { } } -#[async_trait] -impl EndpointHandler for ConnectClusterNodesRequest { +impl RequestHandler for ConnectClusterNodesRequest { type Response = ConnectClusterNodesResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ConnectClusterNodesResponse, Error> { let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node))) .await .into_iter() @@ -165,11 +172,14 @@ impl EndpointHandler for ConnectClusterNodesRequest { } } -#[async_trait] -impl EndpointHandler for GetClusterLayoutRequest { +impl RequestHandler for GetClusterLayoutRequest { type Response = GetClusterLayoutResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterLayoutResponse, Error> { Ok(format_cluster_layout( garage.system.cluster_layout().inner(), )) @@ -225,11 +235,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp // ---- update functions ---- -#[async_trait] -impl EndpointHandler for UpdateClusterLayoutRequest { +impl RequestHandler for UpdateClusterLayoutRequest { type Response = UpdateClusterLayoutResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateClusterLayoutResponse, Error> { let mut layout = garage.system.cluster_layout().inner().clone(); let mut roles = layout.current().roles.clone(); @@ -271,11 +284,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest { } } -#[async_trait] -impl EndpointHandler for ApplyClusterLayoutRequest { +impl RequestHandler for ApplyClusterLayoutRequest { type Response = ApplyClusterLayoutResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ApplyClusterLayoutResponse, Error> { let layout = garage.system.cluster_layout().inner().clone(); let (layout, msg) = layout.apply_staged_changes(Some(self.version))?; @@ -292,11 +308,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest { } } -#[async_trait] -impl EndpointHandler for RevertClusterLayoutRequest { +impl RequestHandler for RevertClusterLayoutRequest { type Response = RevertClusterLayoutResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<RevertClusterLayoutResponse, Error> { let layout = garage.system.cluster_layout().inner().clone(); let layout = layout.revert_staged_changes()?; garage diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 3712ee7d..d7ea7dc9 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,14 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested block does not exist + #[error(display = "Block not found: {}", _0)] + NoSuchBlock(String), + + /// The requested worker does not exist + #[error(display = "Worker not found: {}", _0)] + NoSuchWorker(u64), + /// In Import key, the key already exists #[error( display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", @@ -53,6 +61,8 @@ impl Error { match self { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + Error::NoSuchWorker(_) => "NoSuchWorker", + Error::NoSuchBlock(_) => "NoSuchBlock", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -63,7 +73,9 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => { + StatusCode::NOT_FOUND + } Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 5b7de075..dc6ae4e9 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; - use garage_table::*; use garage_model::garage::Garage; @@ -10,13 +8,12 @@ use garage_model::key_table::*; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for ListKeysRequest { +impl RequestHandler for ListKeysRequest { type Response = ListKeysResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> { + async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> { let res = garage .key_table .get_range( @@ -38,11 +35,14 @@ impl EndpointHandler for ListKeysRequest { } } -#[async_trait] -impl EndpointHandler for GetKeyInfoRequest { +impl RequestHandler for GetKeyInfoRequest { type Response = GetKeyInfoResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetKeyInfoResponse, Error> { let key = match (self.id, self.search) { (Some(id), None) => garage.key_helper().get_existing_key(&id).await?, (None, Some(search)) => { @@ -62,11 +62,14 @@ impl EndpointHandler for GetKeyInfoRequest { } } -#[async_trait] -impl EndpointHandler for CreateKeyRequest { +impl RequestHandler for CreateKeyRequest { type Response = CreateKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<CreateKeyResponse, Error> { let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key")); garage.key_table.insert(&key).await?; @@ -76,11 +79,14 @@ impl EndpointHandler for CreateKeyRequest { } } -#[async_trait] -impl EndpointHandler for ImportKeyRequest { +impl RequestHandler for ImportKeyRequest { type Response = ImportKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<ImportKeyResponse, Error> { let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?; if prev_key.is_some() { return Err(Error::KeyAlreadyExists(self.access_key_id.to_string())); @@ -100,11 +106,14 @@ impl EndpointHandler for ImportKeyRequest { } } -#[async_trait] -impl EndpointHandler for UpdateKeyRequest { +impl RequestHandler for UpdateKeyRequest { type Response = UpdateKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<UpdateKeyResponse, Error> { let mut key = garage.key_helper().get_existing_key(&self.id).await?; let key_state = key.state.as_option_mut().unwrap(); @@ -131,11 +140,14 @@ impl EndpointHandler for UpdateKeyRequest { } } -#[async_trait] -impl EndpointHandler for DeleteKeyRequest { +impl RequestHandler for DeleteKeyRequest { type Response = DeleteKeyResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<DeleteKeyResponse, Error> { let helper = garage.locked_helper().await; let mut key = helper.key().get_existing_key(&self.id).await?; diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index 31b3874d..dd9b7ffd 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -15,21 +15,29 @@ mod cluster; mod key; mod special; -use std::sync::Arc; +mod block; +mod node; +mod repair; +mod worker; -use async_trait::async_trait; +use std::sync::Arc; use garage_model::garage::Garage; +pub use api_server::AdminApiServer as Admin; + pub enum Authorization { None, MetricsToken, AdminToken, } -#[async_trait] -pub trait EndpointHandler { +pub trait RequestHandler { type Response; - async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>; + fn handle( + self, + garage: &Arc<Garage>, + admin: &Admin, + ) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send; } diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs index 9521616e..df2762fe 100644 --- a/src/api/admin/macros.rs +++ b/src/api/admin/macros.rs @@ -70,11 +70,10 @@ macro_rules! admin_endpoints { } )* - #[async_trait] - impl EndpointHandler for AdminApiRequest { + impl RequestHandler for AdminApiRequest { type Response = AdminApiResponse; - async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> { + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> { Ok(match self { $( AdminApiRequest::$special_endpoint(_) => panic!( @@ -82,7 +81,132 @@ macro_rules! admin_endpoints { ), )* $( - AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?), + AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?), + )* + }) + } + } + } + }; +} + +macro_rules! local_admin_endpoints { + [ + $($endpoint:ident,)* + ] => { + paste! { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiRequest { + $( + $endpoint( [<Local $endpoint Request>] ), + )* + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiResponse { + $( + $endpoint( [<Local $endpoint Response>] ), + )* + } + + $( + pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >; + + pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >]; + + pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >; + + impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest { + fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest { + LocalAdminApiRequest::$endpoint(req) + } + } + + impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] { + type Error = LocalAdminApiResponse; + fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> { + match resp { + LocalAdminApiResponse::$endpoint(v) => Ok(v), + x => Err(x), + } + } + } + + impl RequestHandler for [< $endpoint Request >] { + type Response = [< $endpoint Response >]; + + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> { + let to = match self.node.as_str() { + "*" => garage.system.cluster_layout().all_nodes().to_vec(), + id => { + let nodes = garage.system.cluster_layout().all_nodes() + .iter() + .filter(|x| hex::encode(x).starts_with(id)) + .cloned() + .collect::<Vec<_>>(); + if nodes.len() != 1 { + return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes))); + } + nodes + } + }; + + let resps = garage.system.rpc_helper().call_many(&admin.endpoint, + &to, + AdminRpc::Internal(self.body.into()), + RequestStrategy::with_priority(PRIO_NORMAL), + ).await?; + + let mut ret = [< $endpoint Response >] { + success: HashMap::new(), + error: HashMap::new(), + }; + for (node, resp) in resps { + match resp { + Ok(AdminRpcResponse::InternalApiOkResponse(r)) => { + match [< Local $endpoint Response >]::try_from(r) { + Ok(r) => { + ret.success.insert(hex::encode(node), r); + } + Err(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + } + } + Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => { + ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message)); + } + Ok(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + Err(e) => { + ret.error.insert(hex::encode(node), e.to_string()); + } + } + } + + Ok(ret) + } + } + )* + + impl LocalAdminApiRequest { + pub fn name(&self) -> &'static str { + match self { + $( + Self::$endpoint(_) => stringify!($endpoint), + )* + } + } + } + + impl RequestHandler for LocalAdminApiRequest { + type Response = LocalAdminApiResponse; + + async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> { + Ok(match self { + $( + LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?), )* }) } @@ -92,3 +216,4 @@ macro_rules! admin_endpoints { } pub(crate) use admin_endpoints; +pub(crate) use local_admin_endpoints; diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs new file mode 100644 index 00000000..f6f43d95 --- /dev/null +++ b/src/api/admin/node.rs @@ -0,0 +1,216 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; + +use format_table::format_table_to_string; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::layout::PARTITION_BITS; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalCreateMetadataSnapshotRequest { + type Response = LocalCreateMetadataSnapshotResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalCreateMetadataSnapshotResponse, Error> { + garage_model::snapshot::async_snapshot_metadata(garage).await?; + Ok(LocalCreateMetadataSnapshotResponse) + } +} + +impl RequestHandler for LocalGetNodeStatisticsRequest { + type Response = LocalGetNodeStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetNodeStatisticsResponse, Error> { + let mut ret = String::new(); + writeln!( + &mut ret, + "Garage version: {} [features: {}]\nRust compiler version: {}", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), + ) + .unwrap(); + + writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap(); + + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(gather_table_stats(&garage.bucket_table)?); + table.push(gather_table_stats(&garage.key_table)?); + table.push(gather_table_stats(&garage.object_table)?); + table.push(gather_table_stats(&garage.version_table)?); + table.push(gather_table_stats(&garage.block_ref_table)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + // Gather block manager statistics + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + let rc_len = garage.block_manager.rc_len()?.to_string(); + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); + writeln!( + &mut ret, + " resync queue length: {}", + garage.block_manager.resync.queue_len()? + ) + .unwrap(); + writeln!( + &mut ret, + " blocks with resync errors: {}", + garage.block_manager.resync.errors_len()? + ) + .unwrap(); + + Ok(LocalGetNodeStatisticsResponse { freeform: ret }) + } +} + +impl RequestHandler for GetClusterStatisticsRequest { + type Response = GetClusterStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<GetClusterStatisticsResponse, Error> { + let mut ret = String::new(); + + // Gather storage node and free space statistics for current nodes + let layout = &garage.system.cluster_layout(); + let mut node_partition_count = HashMap::<Uuid, u64>::new(); + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<_, _>>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + Ok(GetClusterStatisticsResponse { freeform: ret }) + } +} + +fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); + let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); + + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) +} diff --git a/src/api/admin/repair.rs b/src/api/admin/repair.rs new file mode 100644 index 00000000..a9b8c36a --- /dev/null +++ b/src/api/admin/repair.rs @@ -0,0 +1,403 @@ +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::migrate::Migrate; + +use garage_table::replication::*; +use garage_table::*; + +use garage_block::manager::BlockManager; +use garage_block::repair::ScrubWorkerCommand; + +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +const RC_REPAIR_ITER_COUNT: usize = 64; + +impl RequestHandler for LocalLaunchRepairOperationRequest { + type Response = LocalLaunchRepairOperationResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalLaunchRepairOperationResponse, Error> { + let bg = &admin.background; + match self.repair_type { + RepairType::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; + } + RepairType::Versions => { + info!("Repairing the versions table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairType::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } + RepairType::BlockRefs => { + info!("Repairing the block refs table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); + } + RepairType::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } + RepairType::Blocks => { + info!("Repairing the stored blocks"); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairType::Scrub(cmd) => { + let cmd = match cmd { + ScrubCommand::Start => ScrubWorkerCommand::Start, + ScrubCommand::Pause => { + ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)) + } + ScrubCommand::Resume => ScrubWorkerCommand::Resume, + ScrubCommand::Cancel => ScrubWorkerCommand::Cancel, + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await?; + } + RepairType::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } + } + Ok(LocalLaunchRepairOperationResponse) + } +} + +// ---- + +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> impl Future<Output = Result<bool, GarageError>> + Send; +} + +struct TableRepairWorker<T: TableRepair> { + garage: Arc<Garage>, + pos: Vec<u8>, + counter: usize, + repairs: usize, + inner: T, +} + +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { + Self { + garage, + inner, + pos: vec![], + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl<R: TableRepair> Worker for TableRepairWorker<R> { + fn name(&self) -> String { + format!("{} repair worker", R::T::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { + Some((k, v)) => (v, k), + None => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; + } + + self.counter += 1; + self.pos = next_pos; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} + +// ---- + +struct RepairVersions; + +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> { + if !version.deleted.get() { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(bucket_id, key) + .await? + .map(|o| { + o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), + }; + + if !ref_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairBlockRefs; + +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table + } + + async fn process( + &mut self, + garage: &Garage, + mut block_ref: BlockRef, + ) -> Result<bool, GarageError> { + if !block_ref.deleted.get() { + let ref_exists = garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ---- + +struct RepairMpu; + +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table + } + + async fn process( + &mut self, + garage: &Garage, + mut mpu: MultipartUpload, + ) -> Result<bool, GarageError> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + return Ok(true); + } + } + + Ok(false) + } +} + +// ===== block reference counter repair ===== + +pub struct BlockRcRepair { + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + cursor: Hash, + counter: u64, + repairs: u64, +} + +impl BlockRcRepair { + fn new( + block_manager: Arc<BlockManager>, + block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + ) -> Self { + Self { + block_manager, + block_ref_table, + cursor: [0u8; 32].into(), + counter: 0, + repairs: 0, + } + } +} + +#[async_trait] +impl Worker for BlockRcRepair { + fn name(&self) -> String { + format!("Block refcount repair worker") + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + progress: Some(format!("{} ({})", self.counter, self.repairs)), + ..Default::default() + } + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerState, GarageError> { + for _i in 0..RC_REPAIR_ITER_COUNT { + let next1 = self + .block_manager + .rc + .rc_table + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); + let next2 = self + .block_ref_table + .data + .store + .range(self.cursor.as_slice()..)? + .next() + .transpose()? + .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); + let next = match (next1, next2) { + (Some(k1), Some(k2)) => std::cmp::min(k1, k2), + (Some(k), None) | (None, Some(k)) => k, + (None, None) => { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + }; + + if self.block_manager.rc.recalculate_rc(&next)?.1 { + self.repairs += 1; + } + self.counter += 1; + if let Some(next_incr) = next.increment() { + self.cursor = next_incr; + } else { + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); + return Ok(WorkerState::Done); + } + } + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + unreachable!() + } +} diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index b36bca34..4d5c015e 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -52,12 +52,28 @@ impl AdminApiRequest { POST CreateBucket (body), POST DeleteBucket (query::id), POST UpdateBucket (body_field, query::id), + POST CleanupIncompleteUploads (body), // Bucket-key permissions POST AllowBucketKey (body), POST DenyBucketKey (body), // Bucket aliases POST AddBucketAlias (body), POST RemoveBucketAlias (body), + // Node APIs + POST CreateMetadataSnapshot (default::body, query::node), + GET GetNodeStatistics (default::body, query::node), + GET GetClusterStatistics (), + POST LaunchRepairOperation (body_field, query::node), + // Worker APIs + POST ListWorkers (body_field, query::node), + POST GetWorkerInfo (body_field, query::node), + POST GetWorkerVariable (body_field, query::node), + POST SetWorkerVariable (body_field, query::node), + // Block APIs + GET ListBlockErrors (default::body, query::node), + POST GetBlockInfo (body_field, query::node), + POST RetryBlockResync (body_field, query::node), + POST PurgeBlocks (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { @@ -239,6 +255,7 @@ impl AdminApiRequest { generateQueryParameters! { keywords: [], fields: [ + "node" => node, "domain" => domain, "format" => format, "id" => id, diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs index 0b26fe32..0ecf82bc 100644 --- a/src/api/admin/special.rs +++ b/src/api/admin/special.rs @@ -1,27 +1,31 @@ use std::sync::Arc; -use async_trait::async_trait; - use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, }; use hyper::{Response, StatusCode}; +#[cfg(feature = "metrics")] +use prometheus::{Encoder, TextEncoder}; + use garage_model::garage::Garage; use garage_rpc::system::ClusterHealthStatus; use garage_api_common::helpers::*; -use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest}; +use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest}; use crate::api_server::ResBody; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for OptionsRequest { +impl RequestHandler for OptionsRequest { type Response = Response<ResBody>; - async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { + async fn handle( + self, + _garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(ALLOW, "OPTIONS,GET,POST") @@ -32,11 +36,83 @@ impl EndpointHandler for OptionsRequest { } } -#[async_trait] -impl EndpointHandler for CheckDomainRequest { +impl RequestHandler for MetricsRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + #[cfg(feature = "metrics")] + { + use opentelemetry::trace::Tracer; + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + admin.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(bytes_body(buffer.into()))?) + } + #[cfg(not(feature = "metrics"))] + Err(Error::bad_request( + "Garage was built without the metrics feature".to_string(), + )) + } +} + +impl RequestHandler for HealthRequest { + type Response = Response<ResBody>; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { + let health = garage.system.health(); + + let (status, status_str) = match health.status { + ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), + ClusterHealthStatus::Degraded => ( + StatusCode::OK, + "Garage is operational but some storage nodes are unavailable", + ), + ClusterHealthStatus::Unavailable => ( + StatusCode::SERVICE_UNAVAILABLE, + "Quorum is not available for some/all partitions, reads and writes will fail", + ), + }; + let status_str = format!( + "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n", + status_str + ); + + Ok(Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(string_body(status_str))?) + } +} + +impl RequestHandler for CheckDomainRequest { type Response = Response<ResBody>; - async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<Response<ResBody>, Error> { if check_domain(garage, &self.domain).await? { Ok(Response::builder() .status(StatusCode::OK) @@ -101,33 +177,3 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error> None => Ok(false), } } - -#[async_trait] -impl EndpointHandler for HealthRequest { - type Response = Response<ResBody>; - - async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let health = garage.system.health(); - - let (status, status_str) = match health.status { - ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), - ClusterHealthStatus::Degraded => ( - StatusCode::OK, - "Garage is operational but some storage nodes are unavailable", - ), - ClusterHealthStatus::Unavailable => ( - StatusCode::SERVICE_UNAVAILABLE, - "Quorum is not available for some/all partitions, reads and writes will fail", - ), - }; - let status_str = format!( - "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n", - status_str - ); - - Ok(Response::builder() - .status(status) - .header(http::header::CONTENT_TYPE, "text/plain") - .body(string_body(status_str))?) - } -} diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs new file mode 100644 index 00000000..b3f4537b --- /dev/null +++ b/src/api/admin/worker.rs @@ -0,0 +1,118 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::time::now_msec; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListWorkersRequest { + type Response = LocalListWorkersResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalListWorkersResponse, Error> { + let workers = admin.background.get_worker_info(); + let info = workers + .into_iter() + .filter(|(_, w)| { + (!self.busy_only + || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_))) + && (!self.error_only || w.errors > 0) + }) + .map(|(id, w)| worker_info_to_api(id as u64, w)) + .collect::<Vec<_>>(); + Ok(LocalListWorkersResponse(info)) + } +} + +impl RequestHandler for LocalGetWorkerInfoRequest { + type Response = LocalGetWorkerInfoResponse; + + async fn handle( + self, + _garage: &Arc<Garage>, + admin: &Admin, + ) -> Result<LocalGetWorkerInfoResponse, Error> { + let info = admin + .background + .get_worker_info() + .get(&(self.id as usize)) + .ok_or(Error::NoSuchWorker(self.id))? + .clone(); + Ok(LocalGetWorkerInfoResponse(worker_info_to_api( + self.id, info, + ))) + } +} + +impl RequestHandler for LocalGetWorkerVariableRequest { + type Response = LocalGetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetWorkerVariableResponse, Error> { + let mut res = HashMap::new(); + if let Some(k) = self.variable { + res.insert(k.clone(), garage.bg_vars.get(&k)?); + } else { + let vars = garage.bg_vars.get_all(); + for (k, v) in vars.iter() { + res.insert(k.to_string(), v.to_string()); + } + } + Ok(LocalGetWorkerVariableResponse(res)) + } +} + +impl RequestHandler for LocalSetWorkerVariableRequest { + type Response = LocalSetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalSetWorkerVariableResponse, Error> { + garage.bg_vars.set(&self.variable, &self.value)?; + + Ok(LocalSetWorkerVariableResponse { + variable: self.variable, + value: self.value, + }) + } +} + +// ---- helper functions ---- + +fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { + WorkerInfoResp { + id, + name: info.name, + state: match info.state { + WorkerState::Busy => WorkerStateResp::Busy, + WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t }, + WorkerState::Idle => WorkerStateResp::Idle, + WorkerState::Done => WorkerStateResp::Done, + }, + errors: info.errors as u64, + consecutive_errors: info.consecutive_errors as u64, + last_error: info.last_error.map(|(message, t)| WorkerLastError { + message, + secs_ago: now_msec().saturating_sub(t) / 1000, + }), + + tranquility: info.status.tranquility, + progress: info.status.progress, + queue_length: info.status.queue_length, + persistent_errors: info.status.persistent_errors, + freeform: info.status.freeform, + } +} |