aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/Cargo.toml47
-rw-r--r--src/api/admin/api.rs319
-rw-r--r--src/api/admin/api_server.rs210
-rw-r--r--src/api/admin/block.rs274
-rw-r--r--src/api/admin/bucket.rs116
-rw-r--r--src/api/admin/cluster.rs71
-rw-r--r--src/api/admin/error.rs37
-rw-r--r--src/api/admin/key.rs58
-rw-r--r--src/api/admin/lib.rs43
-rw-r--r--src/api/admin/macros.rs133
-rw-r--r--src/api/admin/mod.rs32
-rw-r--r--src/api/admin/node.rs216
-rw-r--r--src/api/admin/repair.rs (renamed from src/garage/repair/online.rs)177
-rw-r--r--src/api/admin/router_v0.rs5
-rw-r--r--src/api/admin/router_v1.rs7
-rw-r--r--src/api/admin/router_v2.rs30
-rw-r--r--src/api/admin/special.rs133
-rw-r--r--src/api/admin/worker.rs118
-rw-r--r--src/api/common/Cargo.toml44
-rw-r--r--src/api/common/common_error.rs (renamed from src/api/common_error.rs)33
-rw-r--r--src/api/common/cors.rs170
-rw-r--r--src/api/common/encoding.rs (renamed from src/api/encoding.rs)0
-rw-r--r--src/api/common/generic_server.rs (renamed from src/api/generic_server.rs)18
-rw-r--r--src/api/common/helpers.rs (renamed from src/api/helpers.rs)10
-rw-r--r--src/api/common/lib.rs12
-rw-r--r--src/api/common/router_macros.rs (renamed from src/api/router_macros.rs)9
-rw-r--r--src/api/common/signature/error.rs (renamed from src/api/signature/error.rs)0
-rw-r--r--src/api/common/signature/mod.rs (renamed from src/api/signature/mod.rs)0
-rw-r--r--src/api/common/signature/payload.rs (renamed from src/api/signature/payload.rs)2
-rw-r--r--src/api/common/signature/streaming.rs (renamed from src/api/signature/streaming.rs)0
-rw-r--r--src/api/k2v/Cargo.toml37
-rw-r--r--src/api/k2v/api_server.rs26
-rw-r--r--src/api/k2v/batch.rs11
-rw-r--r--src/api/k2v/error.rs27
-rw-r--r--src/api/k2v/index.rs9
-rw-r--r--src/api/k2v/item.rs7
-rw-r--r--src/api/k2v/lib.rs (renamed from src/api/k2v/mod.rs)3
-rw-r--r--src/api/k2v/range.rs5
-rw-r--r--src/api/k2v/router.rs6
-rw-r--r--src/api/lib.rs17
-rw-r--r--src/api/s3/Cargo.toml (renamed from src/api/Cargo.toml)19
-rw-r--r--src/api/s3/api_server.rs45
-rw-r--r--src/api/s3/bucket.rs13
-rw-r--r--src/api/s3/checksum.rs2
-rw-r--r--src/api/s3/copy.rs21
-rw-r--r--src/api/s3/cors.rs183
-rw-r--r--src/api/s3/delete.rs13
-rw-r--r--src/api/s3/encryption.rs7
-rw-r--r--src/api/s3/error.rs35
-rw-r--r--src/api/s3/get.rs11
-rw-r--r--src/api/s3/lib.rs (renamed from src/api/s3/mod.rs)3
-rw-r--r--src/api/s3/lifecycle.rs11
-rw-r--r--src/api/s3/list.rs15
-rw-r--r--src/api/s3/multipart.rs17
-rw-r--r--src/api/s3/post_object.rs19
-rw-r--r--src/api/s3/put.rs11
-rw-r--r--src/api/s3/router.rs7
-rw-r--r--src/api/s3/website.rs13
-rw-r--r--src/api/s3/xml.rs2
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/manager.rs2
-rw-r--r--src/db/Cargo.toml1
-rw-r--r--src/garage/Cargo.toml19
-rw-r--r--src/garage/admin/block.rs235
-rw-r--r--src/garage/admin/bucket.rs53
-rw-r--r--src/garage/admin/mod.rs545
-rw-r--r--src/garage/cli/cmd.rs60
-rw-r--r--src/garage/cli/layout.rs15
-rw-r--r--src/garage/cli/mod.rs11
-rw-r--r--src/garage/cli/repair.rs (renamed from src/garage/repair/offline.rs)0
-rw-r--r--src/garage/cli/structs.rs64
-rw-r--r--src/garage/cli/util.rs216
-rw-r--r--src/garage/cli_v2/block.rs145
-rw-r--r--src/garage/cli_v2/bucket.rs48
-rw-r--r--src/garage/cli_v2/cluster.rs2
-rw-r--r--src/garage/cli_v2/key.rs2
-rw-r--r--src/garage/cli_v2/layout.rs2
-rw-r--r--src/garage/cli_v2/mod.rs104
-rw-r--r--src/garage/cli_v2/node.rs113
-rw-r--r--src/garage/cli_v2/worker.rs213
-rw-r--r--src/garage/main.rs17
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/server.rs11
-rw-r--r--src/garage/tests/common/custom_requester.rs2
-rw-r--r--src/k2v-client/Cargo.toml3
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/k2v/rpc.rs2
-rw-r--r--src/net/Cargo.toml1
-rw-r--r--src/net/client.rs2
-rw-r--r--src/net/endpoint.rs32
-rw-r--r--src/net/netapp.rs2
-rw-r--r--src/net/peering.rs3
-rw-r--r--src/net/recv.rs2
-rw-r--r--src/net/send.rs2
-rw-r--r--src/net/server.rs2
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/system.rs2
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/sync.rs1
-rw-r--r--src/table/table.rs6
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/background/mod.rs5
-rw-r--r--src/util/background/worker.rs14
-rw-r--r--src/web/Cargo.toml6
-rw-r--r--src/web/error.rs8
-rw-r--r--src/web/web_server.rs12
107 files changed, 2834 insertions, 2086 deletions
diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml
new file mode 100644
index 00000000..9ac099e8
--- /dev/null
+++ b/src/api/admin/Cargo.toml
@@ -0,0 +1,47 @@
+[package]
+name = "garage_api_admin"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Admin API server crate for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+format_table.workspace = true
+garage_model.workspace = true
+garage_block.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+garage_rpc.workspace = true
+garage_api_common.workspace = true
+
+argon2.workspace = true
+async-trait.workspace = true
+bytesize.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+paste.workspace = true
+tracing.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
+opentelemetry-prometheus = { workspace = true, optional = true }
+prometheus = { workspace = true, optional = true }
+
+[features]
+metrics = [ "opentelemetry-prometheus", "prometheus" ]
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index 21133f10..97cde158 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -1,17 +1,22 @@
+use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
-use async_trait::async_trait;
use paste::paste;
use serde::{Deserialize, Serialize};
+use garage_rpc::*;
+
use garage_model::garage::Garage;
-use crate::admin::error::Error;
-use crate::admin::macros::*;
-use crate::admin::EndpointHandler;
-use crate::helpers::is_default;
+use garage_api_common::common_error::CommonErrorDerivative;
+use garage_api_common::helpers::is_default;
+
+use crate::api_server::{AdminRpc, AdminRpcResponse};
+use crate::error::Error;
+use crate::macros::*;
+use crate::{Admin, RequestHandler};
// This generates the following:
//
@@ -61,6 +66,7 @@ admin_endpoints![
CreateBucket,
UpdateBucket,
DeleteBucket,
+ CleanupIncompleteUploads,
// Operations on permissions for keys on buckets
AllowBucketKey,
@@ -69,8 +75,55 @@ admin_endpoints![
// Operations on bucket aliases
AddBucketAlias,
RemoveBucketAlias,
+
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ GetClusterStatistics,
+ LaunchRepairOperation,
+
+ // Worker operations
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
+];
+
+local_admin_endpoints![
+ // Node operations
+ CreateMetadataSnapshot,
+ GetNodeStatistics,
+ LaunchRepairOperation,
+ // Background workers
+ ListWorkers,
+ GetWorkerInfo,
+ GetWorkerVariable,
+ SetWorkerVariable,
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
+ RetryBlockResync,
+ PurgeBlocks,
];
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiRequest<RB> {
+ pub node: String,
+ pub body: RB,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MultiResponse<RB> {
+ pub success: HashMap<String, RB>,
+ pub error: HashMap<String, String>,
+}
+
// **********************************************
// Special endpoints
//
@@ -496,6 +549,19 @@ pub struct DeleteBucketRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketResponse;
+// ---- CleanupIncompleteUploads ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsRequest {
+ pub bucket_id: String,
+ pub older_than_secs: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CleanupIncompleteUploadsResponse {
+ pub uploads_deleted: u64,
+}
+
// **********************************************
// Operations on permissions for keys on buckets
// **********************************************
@@ -565,3 +631,246 @@ pub struct RemoveBucketAliasRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
+
+// **********************************************
+// Node operations
+// **********************************************
+
+// ---- CreateMetadataSnapshot ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalCreateMetadataSnapshotRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalCreateMetadataSnapshotResponse;
+
+// ---- GetNodeStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalGetNodeStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetNodeStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- GetClusterStatistics ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct GetClusterStatisticsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct GetClusterStatisticsResponse {
+ pub freeform: String,
+}
+
+// ---- LaunchRepairOperation ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationRequest {
+ pub repair_type: RepairType,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum RepairType {
+ Tables,
+ Blocks,
+ Versions,
+ MultipartUploads,
+ BlockRefs,
+ BlockRc,
+ Rebalance,
+ Scrub(ScrubCommand),
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum ScrubCommand {
+ Start,
+ Pause,
+ Resume,
+ Cancel,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalLaunchRepairOperationResponse;
+
+// **********************************************
+// Worker operations
+// **********************************************
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalListWorkersRequest {
+ #[serde(default)]
+ pub busy_only: bool,
+ #[serde(default)]
+ pub error_only: bool,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerInfoResp {
+ pub id: u64,
+ pub name: String,
+ pub state: WorkerStateResp,
+ pub errors: u64,
+ pub consecutive_errors: u64,
+ pub last_error: Option<WorkerLastError>,
+ pub tranquility: Option<u32>,
+ pub progress: Option<String>,
+ pub queue_length: Option<u64>,
+ pub persistent_errors: Option<u64>,
+ pub freeform: Vec<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum WorkerStateResp {
+ Busy,
+ Throttled { duration_secs: f32 },
+ Idle,
+ Done,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct WorkerLastError {
+ pub message: String,
+ pub secs_ago: u64,
+}
+
+// ---- GetWorkerList ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoRequest {
+ pub id: u64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
+
+// ---- GetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableRequest {
+ pub variable: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
+
+// ---- SetWorkerVariable ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableRequest {
+ pub variable: String,
+ pub value: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalSetWorkerVariableResponse {
+ pub variable: String,
+ pub value: String,
+}
+
+// **********************************************
+// Block operations
+// **********************************************
+
+// ---- ListBlockErrors ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListBlockErrorsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockError {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try_secs_ago: u64,
+ pub next_try_in_secs: u64,
+}
+
+// ---- GetBlockInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoRequest {
+ pub block_hash: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoResponse {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub versions: Vec<BlockVersion>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockVersion {
+ pub version_id: String,
+ pub deleted: bool,
+ pub garbage_collected: bool,
+ pub backlink: Option<BlockVersionBacklink>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum BlockVersionBacklink {
+ Object {
+ bucket_id: String,
+ key: String,
+ },
+ Upload {
+ upload_id: String,
+ upload_deleted: bool,
+ upload_garbage_collected: bool,
+ bucket_id: Option<String>,
+ key: Option<String>,
+ },
+}
+
+// ---- RetryBlockResync ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum LocalRetryBlockResyncRequest {
+ #[serde(rename_all = "camelCase")]
+ All { all: bool },
+ #[serde(rename_all = "camelCase")]
+ Blocks { block_hashes: Vec<String> },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalRetryBlockResyncResponse {
+ pub count: u64,
+}
+
+// ---- PurgeBlocks ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksRequest(pub Vec<String>);
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalPurgeBlocksResponse {
+ pub blocks_purged: u64,
+ pub objects_deleted: u64,
+ pub uploads_deleted: u64,
+ pub versions_deleted: u64,
+}
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index d66714db..37574dcf 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -2,44 +2,108 @@ use std::borrow::Cow;
use std::sync::Arc;
use argon2::password_hash::PasswordHash;
-use async_trait::async_trait;
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
-use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
+use hyper::{body::Incoming as IncomingBody, Request, Response};
+use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")]
use opentelemetry_prometheus::PrometheusExporter;
-#[cfg(feature = "metrics")]
-use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
+use garage_rpc::{Endpoint as RpcEndpoint, *};
+use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
-use crate::generic_server::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
+
+use crate::api::*;
+use crate::error::*;
+use crate::router_v0;
+use crate::router_v1;
+use crate::Authorization;
+use crate::RequestHandler;
+
+// ---- FOR RPC ----
-use crate::admin::api::*;
-use crate::admin::error::*;
-use crate::admin::router_v0;
-use crate::admin::router_v1;
-use crate::admin::Authorization;
-use crate::admin::EndpointHandler;
-use crate::helpers::*;
+pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpc {
+ Proxy(AdminApiRequest),
+ Internal(LocalAdminApiRequest),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRpcResponse {
+ ProxyApiOkResponse(TaggedAdminApiResponse),
+ InternalApiOkResponse(LocalAdminApiResponse),
+ ApiErrorResponse {
+ http_code: u16,
+ error_code: String,
+ message: String,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpcResponse, GarageError>;
+}
+
+impl EndpointHandler<AdminRpc> for AdminApiServer {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpcResponse, GarageError> {
+ match message {
+ AdminRpc::Proxy(req) => {
+ info!("Proxied admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ AdminRpc::Internal(req) => {
+ info!("Internal admin API request: {}", req.name());
+ let res = req.clone().handle(&self.garage, &self).await;
+ match res {
+ Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
+ Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
+ http_code: e.http_status_code().as_u16(),
+ error_code: e.code().to_string(),
+ message: e.to_string(),
+ }),
+ }
+ }
+ }
+ }
+}
+
+// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
#[cfg(feature = "metrics")]
- exporter: PrometheusExporter,
+ pub(crate) exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
+ pub(crate) background: Arc<BackgroundRunner>,
+ pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
}
-pub enum Endpoint {
+pub enum HttpEndpoint {
Old(router_v1::Endpoint),
New(String),
}
@@ -47,91 +111,48 @@ pub enum Endpoint {
impl AdminApiServer {
pub fn new(
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
- ) -> Self {
+ ) -> Arc<Self> {
let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
- Self {
+
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
garage,
#[cfg(feature = "metrics")]
exporter,
metrics_token,
admin_token,
- }
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
pub async fn run(
- self,
+ self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
- ApiServer::new(region, self)
+ ApiServer::new(region, ArcAdminApiServer(self))
.run_server(bind_addr, Some(0o220), must_exit)
.await
}
- fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
- #[cfg(feature = "metrics")]
- {
- use opentelemetry::trace::Tracer;
-
- let mut buffer = vec![];
- let encoder = TextEncoder::new();
-
- let tracer = opentelemetry::global::tracer("garage");
- let metric_families = tracer.in_span("admin/gather_metrics", |_| {
- self.exporter.registry().gather()
- });
-
- encoder
- .encode(&metric_families, &mut buffer)
- .ok_or_internal_error("Could not serialize metrics")?;
-
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(http::header::CONTENT_TYPE, encoder.format_type())
- .body(bytes_body(buffer.into()))?)
- }
- #[cfg(not(feature = "metrics"))]
- Err(Error::bad_request(
- "Garage was built without the metrics feature".to_string(),
- ))
- }
-}
-
-#[async_trait]
-impl ApiHandler for AdminApiServer {
- const API_NAME: &'static str = "admin";
- const API_NAME_DISPLAY: &'static str = "Admin";
-
- type Endpoint = Endpoint;
- type Error = Error;
-
- fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
- if req.uri().path().starts_with("/v0/") {
- let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
- let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
- Ok(Endpoint::Old(endpoint_v1))
- } else if req.uri().path().starts_with("/v1/") {
- let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
- Ok(Endpoint::Old(endpoint_v1))
- } else {
- Ok(Endpoint::New(req.uri().path().to_string()))
- }
- }
-
- async fn handle(
+ async fn handle_http_api(
&self,
req: Request<IncomingBody>,
- endpoint: Endpoint,
+ endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
let auth_header = req.headers().get(AUTHORIZATION).cloned();
let request = match endpoint {
- Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
- Endpoint::New(_) => AdminApiRequest::from_request(req).await?,
+ HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
+ HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
};
let required_auth_hash =
@@ -156,12 +177,12 @@ impl ApiHandler for AdminApiServer {
}
match request {
- AdminApiRequest::Options(req) => req.handle(&self.garage).await,
- AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await,
- AdminApiRequest::Health(req) => req.handle(&self.garage).await,
- AdminApiRequest::Metrics(_req) => self.handle_metrics(),
+ AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
+ AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await,
req => {
- let res = req.handle(&self.garage).await?;
+ let res = req.handle(&self.garage, &self).await?;
let mut res = json_ok_response(&res)?;
res.headers_mut()
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
@@ -171,7 +192,38 @@ impl ApiHandler for AdminApiServer {
}
}
-impl ApiEndpoint for Endpoint {
+struct ArcAdminApiServer(Arc<AdminApiServer>);
+
+impl ApiHandler for ArcAdminApiServer {
+ const API_NAME: &'static str = "admin";
+ const API_NAME_DISPLAY: &'static str = "Admin";
+
+ type Endpoint = HttpEndpoint;
+ type Error = Error;
+
+ fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
+ if req.uri().path().starts_with("/v0/") {
+ let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
+ let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
+ } else if req.uri().path().starts_with("/v1/") {
+ let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
+ Ok(HttpEndpoint::Old(endpoint_v1))
+ } else {
+ Ok(HttpEndpoint::New(req.uri().path().to_string()))
+ }
+ }
+
+ async fn handle(
+ &self,
+ req: Request<IncomingBody>,
+ endpoint: HttpEndpoint,
+ ) -> Result<Response<ResBody>, Error> {
+ self.0.handle_http_api(req, endpoint).await
+ }
+}
+
+impl ApiEndpoint for HttpEndpoint {
fn name(&self) -> Cow<'static, str> {
match self {
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs
new file mode 100644
index 00000000..73d186a6
--- /dev/null
+++ b/src/api/admin/block.rs
@@ -0,0 +1,274 @@
+use std::sync::Arc;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::now_msec;
+
+use garage_table::EmptyKey;
+
+use garage_model::garage::Garage;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use garage_api_common::common_error::CommonErrorDerivative;
+
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListBlockErrorsRequest {
+ type Response = LocalListBlockErrorsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalListBlockErrorsResponse, Error> {
+ let errors = garage.block_manager.list_resync_errors()?;
+ let now = now_msec();
+ let errors = errors
+ .into_iter()
+ .map(|e| BlockError {
+ block_hash: hex::encode(&e.hash),
+ refcount: e.refcount,
+ error_count: e.error_count,
+ last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
+ next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
+ })
+ .collect();
+ Ok(LocalListBlockErrorsResponse(errors))
+ }
+}
+
+impl RequestHandler for LocalGetBlockInfoRequest {
+ type Response = LocalGetBlockInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetBlockInfoResponse, Error> {
+ let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
+ let refcount = garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ let bl = match &v.backlink {
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: u.deleted.get(),
+ upload_garbage_collected: false,
+ bucket_id: Some(hex::encode(&u.bucket_id)),
+ key: Some(u.key.to_string()),
+ }
+ } else {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: true,
+ upload_garbage_collected: true,
+ bucket_id: None,
+ key: None,
+ }
+ }
+ }
+ VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
+ bucket_id: hex::encode(&bucket_id),
+ key: key.to_string(),
+ },
+ };
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: v.deleted.get(),
+ garbage_collected: false,
+ backlink: Some(bl),
+ });
+ } else {
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: true,
+ garbage_collected: true,
+ backlink: None,
+ });
+ }
+ }
+ Ok(LocalGetBlockInfoResponse {
+ block_hash: hex::encode(&hash),
+ refcount,
+ versions,
+ })
+ }
+}
+
+impl RequestHandler for LocalRetryBlockResyncRequest {
+ type Response = LocalRetryBlockResyncResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalRetryBlockResyncResponse, Error> {
+ match self {
+ Self::All { all: true } => {
+ let blocks = garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: blocks.len() as u64,
+ })
+ }
+ Self::All { all: false } => Err(Error::bad_request("nonsense")),
+ Self::Blocks { block_hashes } => {
+ for hash in block_hashes.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(LocalRetryBlockResyncResponse {
+ count: block_hashes.len() as u64,
+ })
+ }
+ }
+ }
+}
+
+impl RequestHandler for LocalPurgeBlocksRequest {
+ type Response = LocalPurgeBlocksResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalPurgeBlocksResponse, Error> {
+ let mut obj_dels = 0;
+ let mut mpu_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in self.0.iter() {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ handle_block_purge_version_backlink(
+ garage,
+ &version,
+ &mut obj_dels,
+ &mut mpu_dels,
+ )
+ .await?;
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(version.uuid, version.backlink, true);
+ garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ }
+
+ Ok(LocalPurgeBlocksResponse {
+ blocks_purged: self.0.len() as u64,
+ versions_deleted: ver_dels,
+ objects_deleted: obj_dels,
+ uploads_deleted: mpu_dels,
+ })
+ }
+}
+
+fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
+ if prefix.len() < 4 {
+ return Err(Error::bad_request(
+ "Please specify at least 4 characters of the block hash",
+ ));
+ }
+
+ let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
+
+ let iter = garage
+ .block_ref_table
+ .data
+ .store
+ .range(&prefix_bin[..]..)
+ .map_err(GarageError::from)?;
+ let mut found = None;
+ for item in iter {
+ let (k, _v) = item.map_err(GarageError::from)?;
+ let hash = Hash::try_from(&k[..32]).unwrap();
+ if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
+ break;
+ }
+ if hex::encode(hash.as_slice()).starts_with(prefix) {
+ match &found {
+ Some(x) if *x == hash => (),
+ Some(_) => {
+ return Err(Error::bad_request(format!(
+ "Several blocks match prefix `{}`",
+ prefix
+ )));
+ }
+ None => {
+ found = Some(hash);
+ }
+ }
+ }
+ }
+
+ found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
+}
+
+async fn handle_block_purge_version_backlink(
+ garage: &Arc<Garage>,
+ version: &Version,
+ obj_dels: &mut u64,
+ mpu_dels: &mut u64,
+) -> Result<(), Error> {
+ let (bucket_id, key, ov_id) = match &version.backlink {
+ VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ if !mpu.deleted.get() {
+ mpu.parts.clear();
+ mpu.deleted.set();
+ garage.mpu_table.insert(&mpu).await?;
+ *mpu_dels += 1;
+ }
+ (mpu.bucket_id, mpu.key.clone(), *upload_id)
+ } else {
+ return Ok(());
+ }
+ }
+ };
+
+ if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == ov_id {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ bucket_id,
+ key,
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
+ }],
+ );
+ garage.object_table.insert(&deleted_object).await?;
+ *obj_dels += 1;
+ }
+ }
+ }
+
+ Ok(())
+}
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index d2d75fc0..d2bb62e0 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-
-use async_trait::async_trait;
+use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
@@ -16,16 +15,20 @@ use garage_model::permission::*;
use garage_model::s3::mpu_table;
use garage_model::s3::object_table::*;
-use crate::admin::api::*;
-use crate::admin::error::*;
-use crate::admin::EndpointHandler;
-use crate::common_error::CommonError;
+use garage_api_common::common_error::CommonError;
+
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for ListBucketsRequest {
+impl RequestHandler for ListBucketsRequest {
type Response = ListBucketsResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ListBucketsResponse, Error> {
let buckets = garage
.bucket_table
.get_range(
@@ -68,11 +71,14 @@ impl EndpointHandler for ListBucketsRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetBucketInfoRequest {
+impl RequestHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
@@ -220,11 +226,14 @@ async fn bucket_info_results(
Ok(res)
}
-#[async_trait]
-impl EndpointHandler for CreateBucketRequest {
+impl RequestHandler for CreateBucketRequest {
type Response = CreateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateBucketResponse, Error> {
let helper = garage.locked_helper().await;
if let Some(ga) = &self.global_alias {
@@ -291,11 +300,14 @@ impl EndpointHandler for CreateBucketRequest {
}
}
-#[async_trait]
-impl EndpointHandler for DeleteBucketRequest {
+impl RequestHandler for DeleteBucketRequest {
type Response = DeleteBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteBucketResponse, Error> {
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&self.id)?;
@@ -340,11 +352,14 @@ impl EndpointHandler for DeleteBucketRequest {
}
}
-#[async_trait]
-impl EndpointHandler for UpdateBucketRequest {
+impl RequestHandler for UpdateBucketRequest {
type Response = UpdateBucketResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateBucketResponse, Error> {
let bucket_id = parse_bucket_id(&self.id)?;
let mut bucket = garage
@@ -387,23 +402,52 @@ impl EndpointHandler for UpdateBucketRequest {
}
}
+impl RequestHandler for CleanupIncompleteUploadsRequest {
+ type Response = CleanupIncompleteUploadsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CleanupIncompleteUploadsResponse, Error> {
+ let duration = Duration::from_secs(self.older_than_secs);
+
+ let bucket_id = parse_bucket_id(&self.bucket_id)?;
+
+ let count = garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket_id, duration)
+ .await?;
+
+ Ok(CleanupIncompleteUploadsResponse {
+ uploads_deleted: count as u64,
+ })
+ }
+}
+
// ---- BUCKET/KEY PERMISSIONS ----
-#[async_trait]
-impl EndpointHandler for AllowBucketKeyRequest {
+impl RequestHandler for AllowBucketKeyRequest {
type Response = AllowBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AllowBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
Ok(AllowBucketKeyResponse(res))
}
}
-#[async_trait]
-impl EndpointHandler for DenyBucketKeyRequest {
+impl RequestHandler for DenyBucketKeyRequest {
type Response = DenyBucketKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DenyBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
Ok(DenyBucketKeyResponse(res))
}
@@ -448,11 +492,14 @@ pub async fn handle_bucket_change_key_perm(
// ---- BUCKET ALIASES ----
-#[async_trait]
-impl EndpointHandler for AddBucketAliasRequest {
+impl RequestHandler for AddBucketAliasRequest {
type Response = AddBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<AddBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
@@ -479,11 +526,14 @@ impl EndpointHandler for AddBucketAliasRequest {
}
}
-#[async_trait]
-impl EndpointHandler for RemoveBucketAliasRequest {
+impl RequestHandler for RemoveBucketAliasRequest {
type Response = RemoveBucketAliasResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RemoveBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 0cfd744a..cb1fa493 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
-
use garage_util::crdt::*;
use garage_util::data::*;
@@ -10,15 +8,18 @@ use garage_rpc::layout;
use garage_model::garage::Garage;
-use crate::admin::api::*;
-use crate::admin::error::*;
-use crate::admin::EndpointHandler;
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for GetClusterStatusRequest {
+impl RequestHandler for GetClusterStatusRequest {
type Response = GetClusterStatusResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatusResponse, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@@ -116,11 +117,14 @@ impl EndpointHandler for GetClusterStatusRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetClusterHealthRequest {
+impl RequestHandler for GetClusterHealthRequest {
type Response = GetClusterHealthResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterHealthResponse, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = GetClusterHealthResponse {
@@ -142,11 +146,14 @@ impl EndpointHandler for GetClusterHealthRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ConnectClusterNodesRequest {
+impl RequestHandler for ConnectClusterNodesRequest {
type Response = ConnectClusterNodesResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ConnectClusterNodesResponse, Error> {
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
.await
.into_iter()
@@ -165,11 +172,14 @@ impl EndpointHandler for ConnectClusterNodesRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetClusterLayoutRequest {
+impl RequestHandler for GetClusterLayoutRequest {
type Response = GetClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterLayoutResponse, Error> {
Ok(format_cluster_layout(
garage.system.cluster_layout().inner(),
))
@@ -225,11 +235,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
// ---- update functions ----
-#[async_trait]
-impl EndpointHandler for UpdateClusterLayoutRequest {
+impl RequestHandler for UpdateClusterLayoutRequest {
type Response = UpdateClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateClusterLayoutResponse, Error> {
let mut layout = garage.system.cluster_layout().inner().clone();
let mut roles = layout.current().roles.clone();
@@ -271,11 +284,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ApplyClusterLayoutRequest {
+impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
@@ -292,11 +308,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest {
}
}
-#[async_trait]
-impl EndpointHandler for RevertClusterLayoutRequest {
+impl RequestHandler for RevertClusterLayoutRequest {
type Response = RevertClusterLayoutResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<RevertClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let layout = layout.revert_staged_changes()?;
garage
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 205fc314..d7ea7dc9 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -6,23 +6,33 @@ use hyper::{HeaderMap, StatusCode};
pub use garage_model::helper::error::Error as HelperError;
-use crate::common_error::CommonError;
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
+use garage_api_common::common_error::{commonErrorDerivative, CommonError};
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// The API access key does not exist
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested block does not exist
+ #[error(display = "Block not found: {}", _0)]
+ NoSuchBlock(String),
+
+ /// The requested worker does not exist
+ #[error(display = "Worker not found: {}", _0)]
+ NoSuchWorker(u64),
+
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@@ -31,14 +41,7 @@ pub enum Error {
KeyAlreadyExists(String),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
+commonErrorDerivative!(Error);
/// FIXME: helper errors are transformed into their corresponding variants
/// in the Error struct, but in many case a helper error should be considered
@@ -53,13 +56,13 @@ impl From<HelperError> for Error {
}
}
-impl CommonErrorDerivative for Error {}
-
impl Error {
pub fn code(&self) -> &'static str {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
+ Error::NoSuchWorker(_) => "NoSuchWorker",
+ Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -70,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
+ StatusCode::NOT_FOUND
+ }
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index d2f449ed..dc6ae4e9 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -1,22 +1,19 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
-
use garage_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
-use crate::admin::api::*;
-use crate::admin::error::*;
-use crate::admin::EndpointHandler;
+use crate::api::*;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for ListKeysRequest {
+impl RequestHandler for ListKeysRequest {
type Response = ListKeysResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
let res = garage
.key_table
.get_range(
@@ -38,11 +35,14 @@ impl EndpointHandler for ListKeysRequest {
}
}
-#[async_trait]
-impl EndpointHandler for GetKeyInfoRequest {
+impl RequestHandler for GetKeyInfoRequest {
type Response = GetKeyInfoResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetKeyInfoResponse, Error> {
let key = match (self.id, self.search) {
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
(None, Some(search)) => {
@@ -62,11 +62,14 @@ impl EndpointHandler for GetKeyInfoRequest {
}
}
-#[async_trait]
-impl EndpointHandler for CreateKeyRequest {
+impl RequestHandler for CreateKeyRequest {
type Response = CreateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<CreateKeyResponse, Error> {
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@@ -76,11 +79,14 @@ impl EndpointHandler for CreateKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for ImportKeyRequest {
+impl RequestHandler for ImportKeyRequest {
type Response = ImportKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<ImportKeyResponse, Error> {
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
@@ -100,11 +106,14 @@ impl EndpointHandler for ImportKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for UpdateKeyRequest {
+impl RequestHandler for UpdateKeyRequest {
type Response = UpdateKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<UpdateKeyResponse, Error> {
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
let key_state = key.state.as_option_mut().unwrap();
@@ -131,11 +140,14 @@ impl EndpointHandler for UpdateKeyRequest {
}
}
-#[async_trait]
-impl EndpointHandler for DeleteKeyRequest {
+impl RequestHandler for DeleteKeyRequest {
type Response = DeleteKeyResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<DeleteKeyResponse, Error> {
let helper = garage.locked_helper().await;
let mut key = helper.key().get_existing_key(&self.id).await?;
diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs
new file mode 100644
index 00000000..dd9b7ffd
--- /dev/null
+++ b/src/api/admin/lib.rs
@@ -0,0 +1,43 @@
+#[macro_use]
+extern crate tracing;
+
+pub mod api_server;
+mod error;
+mod macros;
+
+pub mod api;
+mod router_v0;
+mod router_v1;
+mod router_v2;
+
+mod bucket;
+mod cluster;
+mod key;
+mod special;
+
+mod block;
+mod node;
+mod repair;
+mod worker;
+
+use std::sync::Arc;
+
+use garage_model::garage::Garage;
+
+pub use api_server::AdminApiServer as Admin;
+
+pub enum Authorization {
+ None,
+ MetricsToken,
+ AdminToken,
+}
+
+pub trait RequestHandler {
+ type Response;
+
+ fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send;
+}
diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs
index 9521616e..df2762fe 100644
--- a/src/api/admin/macros.rs
+++ b/src/api/admin/macros.rs
@@ -70,11 +70,10 @@ macro_rules! admin_endpoints {
}
)*
- #[async_trait]
- impl EndpointHandler for AdminApiRequest {
+ impl RequestHandler for AdminApiRequest {
type Response = AdminApiResponse;
- async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> {
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
Ok(match self {
$(
AdminApiRequest::$special_endpoint(_) => panic!(
@@ -82,7 +81,132 @@ macro_rules! admin_endpoints {
),
)*
$(
- AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?),
+ AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
+ )*
+ })
+ }
+ }
+ }
+ };
+}
+
+macro_rules! local_admin_endpoints {
+ [
+ $($endpoint:ident,)*
+ ] => {
+ paste! {
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiRequest {
+ $(
+ $endpoint( [<Local $endpoint Request>] ),
+ )*
+ }
+
+ #[derive(Debug, Clone, Serialize, Deserialize)]
+ pub enum LocalAdminApiResponse {
+ $(
+ $endpoint( [<Local $endpoint Response>] ),
+ )*
+ }
+
+ $(
+ pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
+
+ pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
+
+ pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
+
+ impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
+ fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
+ LocalAdminApiRequest::$endpoint(req)
+ }
+ }
+
+ impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
+ type Error = LocalAdminApiResponse;
+ fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
+ match resp {
+ LocalAdminApiResponse::$endpoint(v) => Ok(v),
+ x => Err(x),
+ }
+ }
+ }
+
+ impl RequestHandler for [< $endpoint Request >] {
+ type Response = [< $endpoint Response >];
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
+ let to = match self.node.as_str() {
+ "*" => garage.system.cluster_layout().all_nodes().to_vec(),
+ id => {
+ let nodes = garage.system.cluster_layout().all_nodes()
+ .iter()
+ .filter(|x| hex::encode(x).starts_with(id))
+ .cloned()
+ .collect::<Vec<_>>();
+ if nodes.len() != 1 {
+ return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
+ }
+ nodes
+ }
+ };
+
+ let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
+ &to,
+ AdminRpc::Internal(self.body.into()),
+ RequestStrategy::with_priority(PRIO_NORMAL),
+ ).await?;
+
+ let mut ret = [< $endpoint Response >] {
+ success: HashMap::new(),
+ error: HashMap::new(),
+ };
+ for (node, resp) in resps {
+ match resp {
+ Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
+ match [< Local $endpoint Response >]::try_from(r) {
+ Ok(r) => {
+ ret.success.insert(hex::encode(node), r);
+ }
+ Err(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ }
+ }
+ Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
+ ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
+ }
+ Ok(_) => {
+ ret.error.insert(hex::encode(node), "returned invalid value".to_string());
+ }
+ Err(e) => {
+ ret.error.insert(hex::encode(node), e.to_string());
+ }
+ }
+ }
+
+ Ok(ret)
+ }
+ }
+ )*
+
+ impl LocalAdminApiRequest {
+ pub fn name(&self) -> &'static str {
+ match self {
+ $(
+ Self::$endpoint(_) => stringify!($endpoint),
+ )*
+ }
+ }
+ }
+
+ impl RequestHandler for LocalAdminApiRequest {
+ type Response = LocalAdminApiResponse;
+
+ async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
+ Ok(match self {
+ $(
+ LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)*
})
}
@@ -92,3 +216,4 @@ macro_rules! admin_endpoints {
}
pub(crate) use admin_endpoints;
+pub(crate) use local_admin_endpoints;
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
deleted file mode 100644
index 86f5bcac..00000000
--- a/src/api/admin/mod.rs
+++ /dev/null
@@ -1,32 +0,0 @@
-pub mod api_server;
-mod error;
-mod macros;
-
-pub mod api;
-mod router_v0;
-mod router_v1;
-mod router_v2;
-
-mod bucket;
-mod cluster;
-mod key;
-mod special;
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-
-use garage_model::garage::Garage;
-
-pub enum Authorization {
- None,
- MetricsToken,
- AdminToken,
-}
-
-#[async_trait]
-pub trait EndpointHandler {
- type Response;
-
- async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>;
-}
diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs
new file mode 100644
index 00000000..f6f43d95
--- /dev/null
+++ b/src/api/admin/node.rs
@@ -0,0 +1,216 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use format_table::format_table_to_string;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::layout::PARTITION_BITS;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalCreateMetadataSnapshotRequest {
+ type Response = LocalCreateMetadataSnapshotResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalCreateMetadataSnapshotResponse, Error> {
+ garage_model::snapshot::async_snapshot_metadata(garage).await?;
+ Ok(LocalCreateMetadataSnapshotResponse)
+ }
+}
+
+impl RequestHandler for LocalGetNodeStatisticsRequest {
+ type Response = LocalGetNodeStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetNodeStatisticsResponse, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "Garage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(gather_table_stats(&garage.bucket_table)?);
+ table.push(gather_table_stats(&garage.key_table)?);
+ table.push(gather_table_stats(&garage.object_table)?);
+ table.push(gather_table_stats(&garage.version_table)?);
+ table.push(gather_table_stats(&garage.block_ref_table)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = garage.block_manager.rc_len()?.to_string();
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ Ok(LocalGetNodeStatisticsResponse { freeform: ret })
+ }
+}
+
+impl RequestHandler for GetClusterStatisticsRequest {
+ type Response = GetClusterStatisticsResponse;
+
+ // FIXME: return this as a JSON struct instead of text
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<GetClusterStatisticsResponse, Error> {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics for current nodes
+ let layout = &garage.system.cluster_layout();
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.current().ring_assignment_data.iter() {
+ let id = layout.current().node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ Ok(GetClusterStatisticsResponse { freeform: ret })
+ }
+}
+
+fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
+ let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+}
diff --git a/src/garage/repair/online.rs b/src/api/admin/repair.rs
index 2c5227d2..a9b8c36a 100644
--- a/src/garage/repair/online.rs
+++ b/src/api/admin/repair.rs
@@ -1,9 +1,18 @@
+use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::{Error as GarageError, OkOrMessage};
+use garage_util::migrate::Migrate;
+
+use garage_table::replication::*;
+use garage_table::*;
+
use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
@@ -13,97 +22,90 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_util::background::*;
-use garage_util::data::*;
-use garage_util::error::Error;
-use garage_util::migrate::Migrate;
-
-use crate::*;
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
const RC_REPAIR_ITER_COUNT: usize = 64;
-pub async fn launch_online_repair(
- garage: &Arc<Garage>,
- bg: &BackgroundRunner,
- opt: RepairOpt,
-) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync()?;
- garage.object_table.syncer.add_full_sync()?;
- garage.version_table.syncer.add_full_sync()?;
- garage.block_ref_table.syncer.add_full_sync()?;
- garage.key_table.syncer.add_full_sync()?;
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
- }
- RepairWhat::MultipartUploads => {
- info!("Repairing the multipart uploads table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
- }
- RepairWhat::BlockRc => {
- info!("Repairing the block reference counters");
- bg.spawn_worker(BlockRcRepair::new(
- garage.block_manager.clone(),
- garage.block_ref_table.clone(),
- ));
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- bg.spawn_worker(garage_block::repair::RepairWorker::new(
- garage.block_manager.clone(),
- ));
- }
- RepairWhat::Scrub { cmd } => {
- let cmd = match cmd {
- ScrubCmd::Start => ScrubWorkerCommand::Start,
- ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
- ScrubCmd::Resume => ScrubWorkerCommand::Resume,
- ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
- ScrubCmd::SetTranquility { tranquility } => {
- garage
- .block_manager
- .scrub_persister
- .set_with(|x| x.tranquility = tranquility)?;
- return Ok(());
- }
- };
- info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await?;
- }
- RepairWhat::Rebalance => {
- info!("Rebalancing the stored blocks among storage locations");
- bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
- garage.block_manager.clone(),
- ));
+impl RequestHandler for LocalLaunchRepairOperationRequest {
+ type Response = LocalLaunchRepairOperationResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalLaunchRepairOperationResponse, Error> {
+ let bg = &admin.background;
+ match self.repair_type {
+ RepairType::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
+ }
+ RepairType::Versions => {
+ info!("Repairing the versions table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
+ }
+ RepairType::MultipartUploads => {
+ info!("Repairing the multipart uploads table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
+ }
+ RepairType::BlockRefs => {
+ info!("Repairing the block refs table");
+ bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
+ }
+ RepairType::BlockRc => {
+ info!("Repairing the block reference counters");
+ bg.spawn_worker(BlockRcRepair::new(
+ garage.block_manager.clone(),
+ garage.block_ref_table.clone(),
+ ));
+ }
+ RepairType::Blocks => {
+ info!("Repairing the stored blocks");
+ bg.spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairType::Scrub(cmd) => {
+ let cmd = match cmd {
+ ScrubCommand::Start => ScrubWorkerCommand::Start,
+ ScrubCommand::Pause => {
+ ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24))
+ }
+ ScrubCommand::Resume => ScrubWorkerCommand::Resume,
+ ScrubCommand::Cancel => ScrubWorkerCommand::Cancel,
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await?;
+ }
+ RepairType::Rebalance => {
+ info!("Rebalancing the stored blocks among storage locations");
+ bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
}
+ Ok(LocalLaunchRepairOperationResponse)
}
- Ok(())
}
// ----
-#[async_trait]
trait TableRepair: Send + Sync + 'static {
type T: TableSchema;
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
- async fn process(
+ fn process(
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
- ) -> Result<bool, Error>;
+ ) -> impl Future<Output = Result<bool, GarageError>> + Send;
}
struct TableRepairWorker<T: TableRepair> {
@@ -139,7 +141,10 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
}
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
@@ -174,7 +179,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
struct RepairVersions;
-#[async_trait]
impl TableRepair for RepairVersions {
type T = VersionTable;
@@ -182,7 +186,7 @@ impl TableRepair for RepairVersions {
&garage.version_table
}
- async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
+ async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> {
if !version.deleted.get() {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
@@ -221,7 +225,6 @@ impl TableRepair for RepairVersions {
struct RepairBlockRefs;
-#[async_trait]
impl TableRepair for RepairBlockRefs {
type T = BlockRefTable;
@@ -229,7 +232,11 @@ impl TableRepair for RepairBlockRefs {
&garage.block_ref_table
}
- async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut block_ref: BlockRef,
+ ) -> Result<bool, GarageError> {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
@@ -257,7 +264,6 @@ impl TableRepair for RepairBlockRefs {
struct RepairMpu;
-#[async_trait]
impl TableRepair for RepairMpu {
type T = MultipartUploadTable;
@@ -265,7 +271,11 @@ impl TableRepair for RepairMpu {
&garage.mpu_table
}
- async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
+ async fn process(
+ &mut self,
+ garage: &Garage,
+ mut mpu: MultipartUpload,
+ ) -> Result<bool, GarageError> {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
@@ -332,7 +342,10 @@ impl Worker for BlockRcRepair {
}
}
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerState, GarageError> {
for _i in 0..RC_REPAIR_ITER_COUNT {
let next1 = self
.block_manager
diff --git a/src/api/admin/router_v0.rs b/src/api/admin/router_v0.rs
index 68676445..9dd742ba 100644
--- a/src/api/admin/router_v0.rs
+++ b/src/api/admin/router_v0.rs
@@ -2,8 +2,9 @@ use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::admin::error::*;
-use crate::router_macros::*;
+use garage_api_common::router_macros::*;
+
+use crate::error::*;
router_match! {@func
diff --git a/src/api/admin/router_v1.rs b/src/api/admin/router_v1.rs
index 7e738145..138a801d 100644
--- a/src/api/admin/router_v1.rs
+++ b/src/api/admin/router_v1.rs
@@ -2,9 +2,10 @@ use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::admin::error::*;
-use crate::admin::router_v0;
-use crate::router_macros::*;
+use garage_api_common::router_macros::*;
+
+use crate::error::*;
+use crate::router_v0;
router_match! {@func
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index 9d60b312..4d5c015e 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -4,12 +4,13 @@ use hyper::body::Incoming as IncomingBody;
use hyper::{Method, Request};
use paste::paste;
-use crate::admin::api::*;
-use crate::admin::error::*;
-use crate::admin::router_v1;
-use crate::admin::Authorization;
-use crate::helpers::*;
-use crate::router_macros::*;
+use garage_api_common::helpers::*;
+use garage_api_common::router_macros::*;
+
+use crate::api::*;
+use crate::error::*;
+use crate::router_v1;
+use crate::Authorization;
impl AdminApiRequest {
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
@@ -51,12 +52,28 @@ impl AdminApiRequest {
POST CreateBucket (body),
POST DeleteBucket (query::id),
POST UpdateBucket (body_field, query::id),
+ POST CleanupIncompleteUploads (body),
// Bucket-key permissions
POST AllowBucketKey (body),
POST DenyBucketKey (body),
// Bucket aliases
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
+ // Node APIs
+ POST CreateMetadataSnapshot (default::body, query::node),
+ GET GetNodeStatistics (default::body, query::node),
+ GET GetClusterStatistics (),
+ POST LaunchRepairOperation (body_field, query::node),
+ // Worker APIs
+ POST ListWorkers (body_field, query::node),
+ POST GetWorkerInfo (body_field, query::node),
+ POST GetWorkerVariable (body_field, query::node),
+ POST SetWorkerVariable (body_field, query::node),
+ // Block APIs
+ GET ListBlockErrors (default::body, query::node),
+ POST GetBlockInfo (body_field, query::node),
+ POST RetryBlockResync (body_field, query::node),
+ POST PurgeBlocks (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
@@ -238,6 +255,7 @@ impl AdminApiRequest {
generateQueryParameters! {
keywords: [],
fields: [
+ "node" => node,
"domain" => domain,
"format" => format,
"id" => id,
diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs
index da3764d9..0ecf82bc 100644
--- a/src/api/admin/special.rs
+++ b/src/api/admin/special.rs
@@ -1,26 +1,31 @@
use std::sync::Arc;
-use async_trait::async_trait;
-
use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW,
};
use hyper::{Response, StatusCode};
+#[cfg(feature = "metrics")]
+use prometheus::{Encoder, TextEncoder};
+
use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus;
-use crate::admin::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
-use crate::admin::api_server::ResBody;
-use crate::admin::error::*;
-use crate::admin::EndpointHandler;
-use crate::helpers::*;
+use garage_api_common::helpers::*;
+
+use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest};
+use crate::api_server::ResBody;
+use crate::error::*;
+use crate::{Admin, RequestHandler};
-#[async_trait]
-impl EndpointHandler for OptionsRequest {
+impl RequestHandler for OptionsRequest {
type Response = Response<ResBody>;
- async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(ALLOW, "OPTIONS,GET,POST")
@@ -31,11 +36,83 @@ impl EndpointHandler for OptionsRequest {
}
}
-#[async_trait]
-impl EndpointHandler for CheckDomainRequest {
+impl RequestHandler for MetricsRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ #[cfg(feature = "metrics")]
+ {
+ use opentelemetry::trace::Tracer;
+
+ let mut buffer = vec![];
+ let encoder = TextEncoder::new();
+
+ let tracer = opentelemetry::global::tracer("garage");
+ let metric_families = tracer.in_span("admin/gather_metrics", |_| {
+ admin.exporter.registry().gather()
+ });
+
+ encoder
+ .encode(&metric_families, &mut buffer)
+ .ok_or_internal_error("Could not serialize metrics")?;
+
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, encoder.format_type())
+ .body(bytes_body(buffer.into()))?)
+ }
+ #[cfg(not(feature = "metrics"))]
+ Err(Error::bad_request(
+ "Garage was built without the metrics feature".to_string(),
+ ))
+ }
+}
+
+impl RequestHandler for HealthRequest {
type Response = Response<ResBody>;
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
+ let health = garage.system.health();
+
+ let (status, status_str) = match health.status {
+ ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
+ ClusterHealthStatus::Degraded => (
+ StatusCode::OK,
+ "Garage is operational but some storage nodes are unavailable",
+ ),
+ ClusterHealthStatus::Unavailable => (
+ StatusCode::SERVICE_UNAVAILABLE,
+ "Quorum is not available for some/all partitions, reads and writes will fail",
+ ),
+ };
+ let status_str = format!(
+ "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
+ status_str
+ );
+
+ Ok(Response::builder()
+ .status(status)
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(string_body(status_str))?)
+ }
+}
+
+impl RequestHandler for CheckDomainRequest {
+ type Response = Response<ResBody>;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<Response<ResBody>, Error> {
if check_domain(garage, &self.domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
@@ -100,33 +177,3 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
None => Ok(false),
}
}
-
-#[async_trait]
-impl EndpointHandler for HealthRequest {
- type Response = Response<ResBody>;
-
- async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
- let health = garage.system.health();
-
- let (status, status_str) = match health.status {
- ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
- ClusterHealthStatus::Degraded => (
- StatusCode::OK,
- "Garage is operational but some storage nodes are unavailable",
- ),
- ClusterHealthStatus::Unavailable => (
- StatusCode::SERVICE_UNAVAILABLE,
- "Quorum is not available for some/all partitions, reads and writes will fail",
- ),
- };
- let status_str = format!(
- "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
- status_str
- );
-
- Ok(Response::builder()
- .status(status)
- .header(http::header::CONTENT_TYPE, "text/plain")
- .body(string_body(status_str))?)
- }
-}
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
new file mode 100644
index 00000000..b3f4537b
--- /dev/null
+++ b/src/api/admin/worker.rs
@@ -0,0 +1,118 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use garage_util::background::*;
+use garage_util::time::now_msec;
+
+use garage_model::garage::Garage;
+
+use crate::api::*;
+use crate::error::Error;
+use crate::{Admin, RequestHandler};
+
+impl RequestHandler for LocalListWorkersRequest {
+ type Response = LocalListWorkersResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalListWorkersResponse, Error> {
+ let workers = admin.background.get_worker_info();
+ let info = workers
+ .into_iter()
+ .filter(|(_, w)| {
+ (!self.busy_only
+ || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
+ && (!self.error_only || w.errors > 0)
+ })
+ .map(|(id, w)| worker_info_to_api(id as u64, w))
+ .collect::<Vec<_>>();
+ Ok(LocalListWorkersResponse(info))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerInfoRequest {
+ type Response = LocalGetWorkerInfoResponse;
+
+ async fn handle(
+ self,
+ _garage: &Arc<Garage>,
+ admin: &Admin,
+ ) -> Result<LocalGetWorkerInfoResponse, Error> {
+ let info = admin
+ .background
+ .get_worker_info()
+ .get(&(self.id as usize))
+ .ok_or(Error::NoSuchWorker(self.id))?
+ .clone();
+ Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
+ self.id, info,
+ )))
+ }
+}
+
+impl RequestHandler for LocalGetWorkerVariableRequest {
+ type Response = LocalGetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetWorkerVariableResponse, Error> {
+ let mut res = HashMap::new();
+ if let Some(k) = self.variable {
+ res.insert(k.clone(), garage.bg_vars.get(&k)?);
+ } else {
+ let vars = garage.bg_vars.get_all();
+ for (k, v) in vars.iter() {
+ res.insert(k.to_string(), v.to_string());
+ }
+ }
+ Ok(LocalGetWorkerVariableResponse(res))
+ }
+}
+
+impl RequestHandler for LocalSetWorkerVariableRequest {
+ type Response = LocalSetWorkerVariableResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalSetWorkerVariableResponse, Error> {
+ garage.bg_vars.set(&self.variable, &self.value)?;
+
+ Ok(LocalSetWorkerVariableResponse {
+ variable: self.variable,
+ value: self.value,
+ })
+ }
+}
+
+// ---- helper functions ----
+
+fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
+ WorkerInfoResp {
+ id,
+ name: info.name,
+ state: match info.state {
+ WorkerState::Busy => WorkerStateResp::Busy,
+ WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
+ WorkerState::Idle => WorkerStateResp::Idle,
+ WorkerState::Done => WorkerStateResp::Done,
+ },
+ errors: info.errors as u64,
+ consecutive_errors: info.consecutive_errors as u64,
+ last_error: info.last_error.map(|(message, t)| WorkerLastError {
+ message,
+ secs_ago: now_msec().saturating_sub(t) / 1000,
+ }),
+
+ tranquility: info.status.tranquility,
+ progress: info.status.progress,
+ queue_length: info.status.queue_length,
+ persistent_errors: info.status.persistent_errors,
+ freeform: info.status.freeform,
+ }
+}
diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml
new file mode 100644
index 00000000..5b9cf479
--- /dev/null
+++ b/src/api/common/Cargo.toml
@@ -0,0 +1,44 @@
+[package]
+name = "garage_api_common"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Common functions for the API server crates for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_model.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+
+bytes.workspace = true
+chrono.workspace = true
+crypto-common.workspace = true
+err-derive.workspace = true
+hex.workspace = true
+hmac.workspace = true
+idna.workspace = true
+tracing.workspace = true
+nom.workspace = true
+pin-project.workspace = true
+sha2.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+http-body-util.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+hyper-util.workspace = true
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
diff --git a/src/api/common_error.rs b/src/api/common/common_error.rs
index 0c8006dc..597a3511 100644
--- a/src/api/common_error.rs
+++ b/src/api/common/common_error.rs
@@ -57,6 +57,35 @@ pub enum CommonError {
InvalidBucketName(String),
}
+#[macro_export]
+macro_rules! commonErrorDerivative {
+ ( $error_struct: ident ) => {
+ impl From<garage_util::error::Error> for $error_struct {
+ fn from(err: garage_util::error::Error) -> Self {
+ Self::Common(CommonError::InternalError(err))
+ }
+ }
+ impl From<http::Error> for $error_struct {
+ fn from(err: http::Error) -> Self {
+ Self::Common(CommonError::Http(err))
+ }
+ }
+ impl From<hyper::Error> for $error_struct {
+ fn from(err: hyper::Error) -> Self {
+ Self::Common(CommonError::Hyper(err))
+ }
+ }
+ impl From<hyper::header::ToStrError> for $error_struct {
+ fn from(err: hyper::header::ToStrError) -> Self {
+ Self::Common(CommonError::InvalidHeader(err))
+ }
+ }
+ impl CommonErrorDerivative for $error_struct {}
+ };
+}
+
+pub use commonErrorDerivative;
+
impl CommonError {
pub fn http_status_code(&self) -> StatusCode {
match self {
@@ -118,14 +147,14 @@ impl TryFrom<HelperError> for CommonError {
/// This is used for helper functions that might return InvalidBucketName
/// or NoSuchBucket for instance, and we want to pass that error
/// up to our caller.
-pub(crate) fn pass_helper_error(err: HelperError) -> CommonError {
+pub fn pass_helper_error(err: HelperError) -> CommonError {
match CommonError::try_from(err) {
Ok(e) => e,
Err(e) => panic!("Helper error `{}` should hot have happenned here", e),
}
}
-pub(crate) fn helper_error_as_internal(err: HelperError) -> CommonError {
+pub fn helper_error_as_internal(err: HelperError) -> CommonError {
match err {
HelperError::Internal(e) => CommonError::InternalError(e),
e => CommonError::InternalError(GarageError::Message(e.to_string())),
diff --git a/src/api/common/cors.rs b/src/api/common/cors.rs
new file mode 100644
index 00000000..14369b56
--- /dev/null
+++ b/src/api/common/cors.rs
@@ -0,0 +1,170 @@
+use std::sync::Arc;
+
+use http::header::{
+ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
+ ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
+};
+use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, StatusCode};
+
+use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule};
+use garage_model::garage::Garage;
+
+use crate::common_error::{
+ helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError,
+};
+use crate::helpers::*;
+
+pub fn find_matching_cors_rule<'a>(
+ bucket_params: &'a BucketParams,
+ req: &Request<impl Body>,
+) -> Result<Option<&'a GarageCorsRule>, CommonError> {
+ if let Some(cors_config) = bucket_params.cors_config.get() {
+ if let Some(origin) = req.headers().get("Origin") {
+ let origin = origin.to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+ return Ok(cors_config.iter().find(|rule| {
+ cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
+ }));
+ }
+ }
+ Ok(None)
+}
+
+pub fn cors_rule_matches<'a, HI, S>(
+ rule: &GarageCorsRule,
+ origin: &'a str,
+ method: &'a str,
+ mut request_headers: HI,
+) -> bool
+where
+ HI: Iterator<Item = S>,
+ S: AsRef<str>,
+{
+ rule.allow_origins.iter().any(|x| x == "*" || x == origin)
+ && rule.allow_methods.iter().any(|x| x == "*" || x == method)
+ && request_headers.all(|h| {
+ rule.allow_headers
+ .iter()
+ .any(|x| x == "*" || x == h.as_ref())
+ })
+}
+
+pub fn add_cors_headers(
+ resp: &mut Response<impl Body>,
+ rule: &GarageCorsRule,
+) -> Result<(), http::header::InvalidHeaderValue> {
+ let h = resp.headers_mut();
+ h.insert(
+ ACCESS_CONTROL_ALLOW_ORIGIN,
+ rule.allow_origins.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_METHODS,
+ rule.allow_methods.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_ALLOW_HEADERS,
+ rule.allow_headers.join(", ").parse()?,
+ );
+ h.insert(
+ ACCESS_CONTROL_EXPOSE_HEADERS,
+ rule.expose_headers.join(", ").parse()?,
+ );
+ Ok(())
+}
+
+pub async fn handle_options_api(
+ garage: Arc<Garage>,
+ req: &Request<IncomingBody>,
+ bucket_name: Option<String>,
+) -> Result<Response<EmptyBody>, CommonError> {
+ // FIXME: CORS rules of buckets with local aliases are
+ // not taken into account.
+
+ // If the bucket name is a global bucket name,
+ // we try to apply the CORS rules of that bucket.
+ // If a user has a local bucket name that has
+ // the same name, its CORS rules won't be applied
+ // and will be shadowed by the rules of the globally
+ // existing bucket (but this is inevitable because
+ // OPTIONS calls are not auhtenticated).
+ if let Some(bn) = bucket_name {
+ let helper = garage.bucket_helper();
+ let bucket_id = helper
+ .resolve_global_bucket_name(&bn)
+ .await
+ .map_err(helper_error_as_internal)?;
+ if let Some(id) = bucket_id {
+ let bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(id)
+ .await
+ .map_err(helper_error_as_internal)?;
+ let bucket_params = bucket.state.into_option().unwrap();
+ handle_options_for_bucket(req, &bucket_params)
+ } else {
+ // If there is a bucket name in the request, but that name
+ // does not correspond to a global alias for a bucket,
+ // then it's either a non-existing bucket or a local bucket.
+ // We have no way of knowing, because the request is not
+ // authenticated and thus we can't resolve local aliases.
+ // We take the permissive approach of allowing everything,
+ // because we don't want to prevent web apps that use
+ // local bucket names from making API calls.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?)
+ }
+ } else {
+ // If there is no bucket name in the request,
+ // we are doing a ListBuckets call, which we want to allow
+ // for all origins.
+ Ok(Response::builder()
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?)
+ }
+}
+
+pub fn handle_options_for_bucket(
+ req: &Request<IncomingBody>,
+ bucket_params: &BucketParams,
+) -> Result<Response<EmptyBody>, CommonError> {
+ let origin = req
+ .headers()
+ .get("Origin")
+ .ok_or_bad_request("Missing Origin header")?
+ .to_str()?;
+ let request_method = req
+ .headers()
+ .get(ACCESS_CONTROL_REQUEST_METHOD)
+ .ok_or_bad_request("Missing Access-Control-Request-Method header")?
+ .to_str()?;
+ let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
+ Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
+ None => vec![],
+ };
+
+ if let Some(cors_config) = bucket_params.cors_config.get() {
+ let matching_rule = cors_config
+ .iter()
+ .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
+ if let Some(rule) = matching_rule {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .body(EmptyBody::new())?;
+ add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
+ return Ok(resp);
+ }
+ }
+
+ Err(CommonError::Forbidden(
+ "This CORS request is not allowed.".into(),
+ ))
+}
diff --git a/src/api/encoding.rs b/src/api/common/encoding.rs
index e286a784..e286a784 100644
--- a/src/api/encoding.rs
+++ b/src/api/common/encoding.rs
diff --git a/src/api/generic_server.rs b/src/api/common/generic_server.rs
index 5a9b29eb..d7ee5692 100644
--- a/src/api/generic_server.rs
+++ b/src/api/common/generic_server.rs
@@ -5,8 +5,6 @@ use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::time::Duration;
-use async_trait::async_trait;
-
use futures::future::Future;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
@@ -37,7 +35,7 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::helpers::{BoxBody, ErrorBody};
-pub(crate) trait ApiEndpoint: Send + Sync + 'static {
+pub trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> Cow<'static, str>;
fn add_span_attributes(&self, span: SpanRef<'_>);
}
@@ -48,8 +46,7 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static {
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
}
-#[async_trait]
-pub(crate) trait ApiHandler: Send + Sync + 'static {
+pub trait ApiHandler: Send + Sync + 'static {
const API_NAME: &'static str;
const API_NAME_DISPLAY: &'static str;
@@ -57,14 +54,14 @@ pub(crate) trait ApiHandler: Send + Sync + 'static {
type Error: ApiError;
fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
- async fn handle(
+ fn handle(
&self,
req: Request<IncomingBody>,
endpoint: Self::Endpoint,
- ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>;
+ ) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send;
}
-pub(crate) struct ApiServer<A: ApiHandler> {
+pub struct ApiServer<A: ApiHandler> {
region: String,
api_handler: A,
@@ -249,13 +246,11 @@ impl<A: ApiHandler> ApiServer<A> {
// ==== helper functions ====
-#[async_trait]
pub trait Accept: Send + Sync + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
- async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
+ fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send;
}
-#[async_trait]
impl Accept for TcpListener {
type Stream = TcpStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
@@ -267,7 +262,6 @@ impl Accept for TcpListener {
pub struct UnixListenerOn(pub UnixListener, pub String);
-#[async_trait]
impl Accept for UnixListenerOn {
type Stream = UnixStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
diff --git a/src/api/helpers.rs b/src/api/common/helpers.rs
index cf60005d..c8586de4 100644
--- a/src/api/helpers.rs
+++ b/src/api/common/helpers.rs
@@ -363,9 +363,9 @@ mod tests {
}
#[derive(Serialize)]
-pub(crate) struct CustomApiErrorBody {
- pub(crate) code: String,
- pub(crate) message: String,
- pub(crate) region: String,
- pub(crate) path: String,
+pub struct CustomApiErrorBody {
+ pub code: String,
+ pub message: String,
+ pub region: String,
+ pub path: String,
}
diff --git a/src/api/common/lib.rs b/src/api/common/lib.rs
new file mode 100644
index 00000000..0e655a53
--- /dev/null
+++ b/src/api/common/lib.rs
@@ -0,0 +1,12 @@
+//! Crate for serving a S3 compatible API
+#[macro_use]
+extern crate tracing;
+
+pub mod common_error;
+
+pub mod cors;
+pub mod encoding;
+pub mod generic_server;
+pub mod helpers;
+pub mod router_macros;
+pub mod signature;
diff --git a/src/api/router_macros.rs b/src/api/common/router_macros.rs
index 142cdc11..f4a93c67 100644
--- a/src/api/router_macros.rs
+++ b/src/api/common/router_macros.rs
@@ -1,5 +1,6 @@
/// This macro is used to generate very repetitive match {} blocks in this module
/// It is _not_ made to be used anywhere else
+#[macro_export]
macro_rules! router_match {
(@match $enum:expr , [ $($endpoint:ident,)* ]) => {{
// usage: router_match {@match my_enum, [ VariantWithField1, VariantWithField2 ..] }
@@ -140,6 +141,9 @@ macro_rules! router_match {
}
}};
+ (@@parse_param $query:expr, default, $param:ident) => {{
+ Default::default()
+ }};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())
@@ -210,6 +214,7 @@ macro_rules! router_match {
/// This macro is used to generate part of the code in this module. It must be called only one, and
/// is useless outside of this module.
+#[macro_export]
macro_rules! generateQueryParameters {
(
keywords: [ $($kw_param:expr => $kw_name: ident),* ],
@@ -298,5 +303,5 @@ macro_rules! generateQueryParameters {
}
}
-pub(crate) use generateQueryParameters;
-pub(crate) use router_match;
+pub use generateQueryParameters;
+pub use router_match;
diff --git a/src/api/signature/error.rs b/src/api/common/signature/error.rs
index 2d92a072..2d92a072 100644
--- a/src/api/signature/error.rs
+++ b/src/api/common/signature/error.rs
diff --git a/src/api/signature/mod.rs b/src/api/common/signature/mod.rs
index 6514da43..6514da43 100644
--- a/src/api/signature/mod.rs
+++ b/src/api/common/signature/mod.rs
diff --git a/src/api/signature/payload.rs b/src/api/common/signature/payload.rs
index 9e5a6043..81541e4a 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/common/signature/payload.rs
@@ -518,7 +518,7 @@ impl Authorization {
})
}
- pub(crate) fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
+ pub fn parse_form(params: &HeaderMap) -> Result<Self, Error> {
let algorithm = params
.get(X_AMZ_ALGORITHM)
.ok_or_bad_request("Missing X-Amz-Algorithm header")?
diff --git a/src/api/signature/streaming.rs b/src/api/common/signature/streaming.rs
index e223d1b1..e223d1b1 100644
--- a/src/api/signature/streaming.rs
+++ b/src/api/common/signature/streaming.rs
diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml
new file mode 100644
index 00000000..e3ebedca
--- /dev/null
+++ b/src/api/k2v/Cargo.toml
@@ -0,0 +1,37 @@
+[package]
+name = "garage_api_k2v"
+version = "1.0.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "K2V API server crate for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_model = { workspace = true, features = [ "k2v" ] }
+garage_table.workspace = true
+garage_util = { workspace = true, features = [ "k2v" ] }
+garage_api_common.workspace = true
+
+base64.workspace = true
+err-derive.workspace = true
+tracing.workspace = true
+
+futures.workspace = true
+tokio.workspace = true
+http.workspace = true
+http-body-util.workspace = true
+hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
+percent-encoding.workspace = true
+url.workspace = true
+
+serde.workspace = true
+serde_json.workspace = true
+
+opentelemetry.workspace = true
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 863452e6..015fd687 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -1,8 +1,6 @@
use std::borrow::Cow;
use std::sync::Arc;
-use async_trait::async_trait;
-
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
use tokio::sync::watch;
@@ -13,26 +11,25 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
-use crate::generic_server::*;
-use crate::k2v::error::*;
-
-use crate::signature::verify_request;
+use garage_api_common::cors::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_request;
-use crate::helpers::*;
-use crate::k2v::batch::*;
-use crate::k2v::index::*;
-use crate::k2v::item::*;
-use crate::k2v::router::Endpoint;
-use crate::s3::cors::*;
+use crate::batch::*;
+use crate::error::*;
+use crate::index::*;
+use crate::item::*;
+use crate::router::Endpoint;
-pub use crate::signature::streaming::ReqBody;
+pub use garage_api_common::signature::streaming::ReqBody;
pub type ResBody = BoxBody<Error>;
pub struct K2VApiServer {
garage: Arc<Garage>,
}
-pub(crate) struct K2VApiEndpoint {
+pub struct K2VApiEndpoint {
bucket_name: String,
endpoint: Endpoint,
}
@@ -50,7 +47,6 @@ impl K2VApiServer {
}
}
-#[async_trait]
impl ApiHandler for K2VApiServer {
const API_NAME: &'static str = "k2v";
const API_NAME_DISPLAY: &'static str = "K2V";
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index e4d0b0e5..c284dbd4 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -6,11 +6,12 @@ use garage_table::{EnumerationOrder, TableSchema};
use garage_model::k2v::item_table::*;
-use crate::helpers::*;
-use crate::k2v::api_server::{ReqBody, ResBody};
-use crate::k2v::error::*;
-use crate::k2v::item::parse_causality_token;
-use crate::k2v::range::read_range;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::item::parse_causality_token;
+use crate::range::read_range;
pub async fn handle_insert_batch(
ctx: ReqCtx,
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
index dbe4be2c..3cd0e6f7 100644
--- a/src/api/k2v/error.rs
+++ b/src/api/k2v/error.rs
@@ -2,19 +2,21 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
-use crate::common_error::CommonError;
-pub(crate) use crate::common_error::{helper_error_as_internal, pass_helper_error};
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
-use crate::signature::error::Error as SignatureError;
+use garage_api_common::common_error::{commonErrorDerivative, CommonError};
+pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error};
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::error::Error as SignatureError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// Authorization Header Malformed
@@ -42,16 +44,7 @@ pub enum Error {
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
-
-impl CommonErrorDerivative for Error {}
+commonErrorDerivative!(Error);
impl From<SignatureError> for Error {
fn from(err: SignatureError) -> Self {
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index e3397238..fbfaad98 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -5,10 +5,11 @@ use garage_table::util::*;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
-use crate::helpers::*;
-use crate::k2v::api_server::ResBody;
-use crate::k2v::error::*;
-use crate::k2v::range::read_range;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::error::*;
+use crate::range::read_range;
pub async fn handle_read_index(
ctx: ReqCtx,
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 87371727..4e28b499 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -6,9 +6,10 @@ use hyper::{Request, Response, StatusCode};
use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*;
-use crate::helpers::*;
-use crate::k2v::api_server::{ReqBody, ResBody};
-use crate::k2v::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
diff --git a/src/api/k2v/mod.rs b/src/api/k2v/lib.rs
index b6a8c5cf..334ae46b 100644
--- a/src/api/k2v/mod.rs
+++ b/src/api/k2v/lib.rs
@@ -1,3 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
pub mod api_server;
mod error;
mod router;
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
index bb9d3be5..eb4738db 100644
--- a/src/api/k2v/range.rs
+++ b/src/api/k2v/range.rs
@@ -7,8 +7,9 @@ use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::helpers::key_after_prefix;
-use crate::k2v::error::*;
+use garage_api_common::helpers::key_after_prefix;
+
+use crate::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index 1cc58be5..a04b0f81 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -1,11 +1,11 @@
-use crate::k2v::error::*;
+use crate::error::*;
use std::borrow::Cow;
use hyper::{Method, Request};
-use crate::helpers::Authorization;
-use crate::router_macros::{generateQueryParameters, router_match};
+use garage_api_common::helpers::Authorization;
+use garage_api_common::router_macros::{generateQueryParameters, router_match};
router_match! {@func
diff --git a/src/api/lib.rs b/src/api/lib.rs
deleted file mode 100644
index 370dfd7a..00000000
--- a/src/api/lib.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-//! Crate for serving a S3 compatible API
-#[macro_use]
-extern crate tracing;
-
-pub mod common_error;
-
-mod encoding;
-pub mod generic_server;
-pub mod helpers;
-mod router_macros;
-/// This mode is public only to help testing. Don't expect stability here
-pub mod signature;
-
-pub mod admin;
-#[cfg(feature = "k2v")]
-pub mod k2v;
-pub mod s3;
diff --git a/src/api/Cargo.toml b/src/api/s3/Cargo.toml
index 1becbcdf..387e45db 100644
--- a/src/api/Cargo.toml
+++ b/src/api/s3/Cargo.toml
@@ -1,5 +1,5 @@
[package]
-name = "garage_api"
+name = "garage_api_s3"
version = "1.0.1"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
@@ -20,31 +20,24 @@ garage_block.workspace = true
garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
+garage_api_common.workspace = true
aes-gcm.workspace = true
-argon2.workspace = true
async-compression.workspace = true
-async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
crc32fast.workspace = true
crc32c.workspace = true
-crypto-common.workspace = true
err-derive.workspace = true
hex.workspace = true
-hmac.workspace = true
-idna.workspace = true
tracing.workspace = true
md-5.workspace = true
-nom.workspace = true
-paste.workspace = true
pin-project.workspace = true
sha1.workspace = true
sha2.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
@@ -55,21 +48,13 @@ httpdate.workspace = true
http-range.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, default-features = false, features = ["server", "http1"] }
-hyper-util.workspace = true
multer.workspace = true
percent-encoding.workspace = true
roxmltree.workspace = true
url.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
serde_json.workspace = true
quick-xml.workspace = true
opentelemetry.workspace = true
-opentelemetry-prometheus = { workspace = true, optional = true }
-prometheus = { workspace = true, optional = true }
-
-[features]
-k2v = [ "garage_util/k2v", "garage_model/k2v" ]
-metrics = [ "opentelemetry-prometheus", "prometheus" ]
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index 2b638b15..c8c28f3d 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -1,8 +1,6 @@
use std::borrow::Cow;
use std::sync::Arc;
-use async_trait::async_trait;
-
use hyper::header;
use hyper::{body::Incoming as IncomingBody, Request, Response};
use tokio::sync::watch;
@@ -15,33 +13,33 @@ use garage_util::socket_address::UnixOrTCPSocketAddress;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
-use crate::generic_server::*;
-use crate::s3::error::*;
-
-use crate::signature::verify_request;
-
-use crate::helpers::*;
-use crate::s3::bucket::*;
-use crate::s3::copy::*;
-use crate::s3::cors::*;
-use crate::s3::delete::*;
-use crate::s3::get::*;
-use crate::s3::lifecycle::*;
-use crate::s3::list::*;
-use crate::s3::multipart::*;
-use crate::s3::post_object::handle_post_object;
-use crate::s3::put::*;
-use crate::s3::router::Endpoint;
-use crate::s3::website::*;
-
-pub use crate::signature::streaming::ReqBody;
+use garage_api_common::cors::*;
+use garage_api_common::generic_server::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_request;
+
+use crate::bucket::*;
+use crate::copy::*;
+use crate::cors::*;
+use crate::delete::*;
+use crate::error::*;
+use crate::get::*;
+use crate::lifecycle::*;
+use crate::list::*;
+use crate::multipart::*;
+use crate::post_object::handle_post_object;
+use crate::put::*;
+use crate::router::Endpoint;
+use crate::website::*;
+
+pub use garage_api_common::signature::streaming::ReqBody;
pub type ResBody = BoxBody<Error>;
pub struct S3ApiServer {
garage: Arc<Garage>,
}
-pub(crate) struct S3ApiEndpoint {
+pub struct S3ApiEndpoint {
bucket_name: Option<String>,
endpoint: Endpoint,
}
@@ -71,7 +69,6 @@ impl S3ApiServer {
}
}
-#[async_trait]
impl ApiHandler for S3ApiServer {
const API_NAME: &'static str = "s3";
const API_NAME_DISPLAY: &'static str = "S3";
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 6a12aa9c..0a192ba6 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -13,12 +13,13 @@ use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
-use crate::common_error::CommonError;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::common_error::CommonError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml as s3_xml;
pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = ctx;
diff --git a/src/api/s3/checksum.rs b/src/api/s3/checksum.rs
index c7527163..02fb55ec 100644
--- a/src/api/s3/checksum.rs
+++ b/src/api/s3/checksum.rs
@@ -15,7 +15,7 @@ use garage_util::error::OkOrMessage;
use garage_model::s3::object_table::*;
-use crate::s3::error::*;
+use crate::error::*;
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index b67ace88..07d50ea5 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -20,15 +20,16 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::get::full_object_byte_stream;
-use crate::s3::multipart;
-use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
-use crate::s3::xml::{self as s3_xml, xmlns_tag};
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::get::full_object_byte_stream;
+use crate::multipart;
+use crate::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
+use crate::xml::{self as s3_xml, xmlns_tag};
// -------- CopyObject ---------
@@ -862,7 +863,7 @@ pub struct CopyPartResult {
#[cfg(test)]
mod tests {
use super::*;
- use crate::s3::xml::to_xml_with_header;
+ use crate::xml::to_xml_with_header;
#[test]
fn copy_object_result() -> Result<(), Error> {
diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs
index 32dcc0d5..625b84db 100644
--- a/src/api/s3/cors.rs
+++ b/src/api/s3/cors.rs
@@ -1,31 +1,21 @@
-use std::sync::Arc;
-
use quick_xml::de::from_reader;
-use http::header::{
- ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
- ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD,
-};
-use hyper::{
- body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response,
- StatusCode,
-};
+use hyper::{header::HeaderName, Method, Request, Response, StatusCode};
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
-use crate::common_error::{helper_error_as_internal, CommonError};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
-
-use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule};
-use garage_model::garage::Garage;
+use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_util::data::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+
pub async fn handle_get_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(cors) = bucket_params.cors_config.get() {
@@ -100,161 +90,6 @@ pub async fn handle_put_cors(
.body(empty_body())?)
}
-pub async fn handle_options_api(
- garage: Arc<Garage>,
- req: &Request<IncomingBody>,
- bucket_name: Option<String>,
-) -> Result<Response<EmptyBody>, CommonError> {
- // FIXME: CORS rules of buckets with local aliases are
- // not taken into account.
-
- // If the bucket name is a global bucket name,
- // we try to apply the CORS rules of that bucket.
- // If a user has a local bucket name that has
- // the same name, its CORS rules won't be applied
- // and will be shadowed by the rules of the globally
- // existing bucket (but this is inevitable because
- // OPTIONS calls are not auhtenticated).
- if let Some(bn) = bucket_name {
- let helper = garage.bucket_helper();
- let bucket_id = helper
- .resolve_global_bucket_name(&bn)
- .await
- .map_err(helper_error_as_internal)?;
- if let Some(id) = bucket_id {
- let bucket = garage
- .bucket_helper()
- .get_existing_bucket(id)
- .await
- .map_err(helper_error_as_internal)?;
- let bucket_params = bucket.state.into_option().unwrap();
- handle_options_for_bucket(req, &bucket_params)
- } else {
- // If there is a bucket name in the request, but that name
- // does not correspond to a global alias for a bucket,
- // then it's either a non-existing bucket or a local bucket.
- // We have no way of knowing, because the request is not
- // authenticated and thus we can't resolve local aliases.
- // We take the permissive approach of allowing everything,
- // because we don't want to prevent web apps that use
- // local bucket names from making API calls.
- Ok(Response::builder()
- .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
- .status(StatusCode::OK)
- .body(EmptyBody::new())?)
- }
- } else {
- // If there is no bucket name in the request,
- // we are doing a ListBuckets call, which we want to allow
- // for all origins.
- Ok(Response::builder()
- .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
- .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
- .status(StatusCode::OK)
- .body(EmptyBody::new())?)
- }
-}
-
-pub fn handle_options_for_bucket(
- req: &Request<IncomingBody>,
- bucket_params: &BucketParams,
-) -> Result<Response<EmptyBody>, CommonError> {
- let origin = req
- .headers()
- .get("Origin")
- .ok_or_bad_request("Missing Origin header")?
- .to_str()?;
- let request_method = req
- .headers()
- .get(ACCESS_CONTROL_REQUEST_METHOD)
- .ok_or_bad_request("Missing Access-Control-Request-Method header")?
- .to_str()?;
- let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
- Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
- None => vec![],
- };
-
- if let Some(cors_config) = bucket_params.cors_config.get() {
- let matching_rule = cors_config
- .iter()
- .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter()));
- if let Some(rule) = matching_rule {
- let mut resp = Response::builder()
- .status(StatusCode::OK)
- .body(EmptyBody::new())?;
- add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
- return Ok(resp);
- }
- }
-
- Err(CommonError::Forbidden(
- "This CORS request is not allowed.".into(),
- ))
-}
-
-pub fn find_matching_cors_rule<'a>(
- bucket_params: &'a BucketParams,
- req: &Request<impl Body>,
-) -> Result<Option<&'a GarageCorsRule>, Error> {
- if let Some(cors_config) = bucket_params.cors_config.get() {
- if let Some(origin) = req.headers().get("Origin") {
- let origin = origin.to_str()?;
- let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) {
- Some(h) => h.to_str()?.split(',').map(|h| h.trim()).collect::<Vec<_>>(),
- None => vec![],
- };
- return Ok(cors_config.iter().find(|rule| {
- cors_rule_matches(rule, origin, req.method().as_ref(), request_headers.iter())
- }));
- }
- }
- Ok(None)
-}
-
-fn cors_rule_matches<'a, HI, S>(
- rule: &GarageCorsRule,
- origin: &'a str,
- method: &'a str,
- mut request_headers: HI,
-) -> bool
-where
- HI: Iterator<Item = S>,
- S: AsRef<str>,
-{
- rule.allow_origins.iter().any(|x| x == "*" || x == origin)
- && rule.allow_methods.iter().any(|x| x == "*" || x == method)
- && request_headers.all(|h| {
- rule.allow_headers
- .iter()
- .any(|x| x == "*" || x == h.as_ref())
- })
-}
-
-pub fn add_cors_headers(
- resp: &mut Response<impl Body>,
- rule: &GarageCorsRule,
-) -> Result<(), http::header::InvalidHeaderValue> {
- let h = resp.headers_mut();
- h.insert(
- ACCESS_CONTROL_ALLOW_ORIGIN,
- rule.allow_origins.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_ALLOW_METHODS,
- rule.allow_methods.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_ALLOW_HEADERS,
- rule.allow_headers.join(", ").parse()?,
- );
- h.insert(
- ACCESS_CONTROL_EXPOSE_HEADERS,
- rule.expose_headers.join(", ").parse()?,
- );
- Ok(())
-}
-
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index 57f6f948..b799e67a 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -5,12 +5,13 @@ use garage_util::data::*;
use garage_model::s3::object_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::put::next_timestamp;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::put::next_timestamp;
+use crate::xml as s3_xml;
async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> {
let ReqCtx {
diff --git a/src/api/s3/encryption.rs b/src/api/s3/encryption.rs
index 2e6ed65c..b38d7792 100644
--- a/src/api/s3/encryption.rs
+++ b/src/api/s3/encryption.rs
@@ -28,9 +28,10 @@ use garage_util::migrate::Migrate;
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner};
-use crate::common_error::*;
-use crate::s3::checksum::Md5Checksum;
-use crate::s3::error::Error;
+use garage_api_common::common_error::*;
+
+use crate::checksum::Md5Checksum;
+use crate::error::Error;
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm");
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index 22d2fe14..1bb8909c 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -6,20 +6,28 @@ use hyper::{HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
-pub(crate) use crate::common_error::pass_helper_error;
-use crate::common_error::{helper_error_as_internal, CommonError};
-pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
-use crate::generic_server::ApiError;
-use crate::helpers::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::error::Error as SignatureError;
+pub(crate) use garage_api_common::common_error::pass_helper_error;
+
+use garage_api_common::common_error::{
+ commonErrorDerivative, helper_error_as_internal, CommonError,
+};
+
+pub use garage_api_common::common_error::{
+ CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
+};
+
+use garage_api_common::generic_server::ApiError;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::error::Error as SignatureError;
+
+use crate::xml as s3_xml;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "{}", _0)]
/// Error from common error
- Common(CommonError),
+ Common(#[error(source)] CommonError),
// Category: cannot process
/// Authorization Header Malformed
@@ -81,14 +89,7 @@ pub enum Error {
NotImplemented(String),
}
-impl<T> From<T> for Error
-where
- CommonError: From<T>,
-{
- fn from(err: T) -> Self {
- Error::Common(CommonError::from(err))
- }
-}
+commonErrorDerivative!(Error);
// Helper errors are always passed as internal errors by default.
// To pass the specific error code back to the client, use `pass_helper_error`.
@@ -98,8 +99,6 @@ impl From<HelperError> for Error {
}
}
-impl CommonErrorDerivative for Error {}
-
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index f61aae11..c2393a51 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -25,11 +25,12 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::ResBody;
-use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::ResBody;
+use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
+use crate::encryption::EncryptionParams;
+use crate::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
diff --git a/src/api/s3/mod.rs b/src/api/s3/lib.rs
index b9bb1a6f..fd99b443 100644
--- a/src/api/s3/mod.rs
+++ b/src/api/s3/lib.rs
@@ -1,3 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
pub mod api_server;
pub mod error;
diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs
index 7eb1c2cb..c35047ed 100644
--- a/src/api/s3/lifecycle.rs
+++ b/src/api/s3/lifecycle.rs
@@ -5,11 +5,12 @@ use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 68d6cbe6..a5cc03b0 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -13,13 +13,14 @@ use garage_model::s3::object_table::*;
use garage_table::EnumerationOrder;
-use crate::encoding::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::multipart as s3_multipart;
-use crate::s3::xml as s3_xml;
+use garage_api_common::encoding::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::multipart as s3_multipart;
+use crate::xml as s3_xml;
const DUMMY_NAME: &str = "Dummy Key";
const DUMMY_KEY: &str = "GKDummyKey";
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 3db3e8aa..fe39fc93 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -15,14 +15,15 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::put::*;
-use crate::s3::xml as s3_xml;
-use crate::signature::verify_signed_content;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::put::*;
+use crate::xml as s3_xml;
// ----
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index 5279ec6a..6c0e73d4 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -16,15 +16,16 @@ use serde::Deserialize;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::ResBody;
-use crate::s3::checksum::*;
-use crate::s3::cors::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
-use crate::s3::put::{get_headers, save_stream, ChecksumMode};
-use crate::s3::xml as s3_xml;
-use crate::signature::payload::{verify_v4, Authorization};
+use garage_api_common::cors::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::payload::{verify_v4, Authorization};
+
+use crate::api_server::ResBody;
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
+use crate::put::{get_headers, save_stream, ChecksumMode};
+use crate::xml as s3_xml;
pub async fn handle_post_object(
garage: Arc<Garage>,
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index bfb0dc9b..530b4e7b 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -30,11 +30,12 @@ use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::checksum::*;
-use crate::s3::encryption::EncryptionParams;
-use crate::s3::error::*;
+use garage_api_common::helpers::*;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::checksum::*;
+use crate::encryption::EncryptionParams;
+use crate::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
diff --git a/src/api/s3/router.rs b/src/api/s3/router.rs
index e7ac1d77..9de84b2b 100644
--- a/src/api/s3/router.rs
+++ b/src/api/s3/router.rs
@@ -3,9 +3,10 @@ use std::borrow::Cow;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, Method, Request};
-use crate::helpers::Authorization;
-use crate::router_macros::{generateQueryParameters, router_match};
-use crate::s3::error::*;
+use garage_api_common::helpers::Authorization;
+use garage_api_common::router_macros::{generateQueryParameters, router_match};
+
+use crate::error::*;
router_match! {@func
diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs
index fa36bc32..b55bb345 100644
--- a/src/api/s3/website.rs
+++ b/src/api/s3/website.rs
@@ -4,15 +4,16 @@ use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
-use crate::helpers::*;
-use crate::s3::api_server::{ReqBody, ResBody};
-use crate::s3::error::*;
-use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
-use crate::signature::verify_signed_content;
-
use garage_model::bucket_table::*;
use garage_util::data::*;
+use garage_api_common::helpers::*;
+use garage_api_common::signature::verify_signed_content;
+
+use crate::api_server::{ReqBody, ResBody};
+use crate::error::*;
+use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
+
pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(website) = bucket_params.website_config.get() {
diff --git a/src/api/s3/xml.rs b/src/api/s3/xml.rs
index 1e569ade..e8af3ec0 100644
--- a/src/api/s3/xml.rs
+++ b/src/api/s3/xml.rs
@@ -1,7 +1,7 @@
use quick_xml::se::to_string;
use serde::{Deserialize, Serialize, Serializer};
-use crate::s3::error::Error as ApiError;
+use crate::error::Error as ApiError;
pub fn to_xml_with_header<T: Serialize>(x: &T) -> Result<String, ApiError> {
let mut xml = r#"<?xml version="1.0" encoding="UTF-8"?>"#.to_string();
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 1af4d7f5..3358a3e7 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -34,10 +34,8 @@ async-compression.workspace = true
zstd.workspace = true
serde.workspace = true
-serde_bytes.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 40b177a2..537e1fc1 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::{ArcSwap, ArcSwapOption};
-use async_trait::async_trait;
use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
@@ -688,7 +687,6 @@ impl BlockManager {
}
}
-#[async_trait]
impl StreamingEndpointHandler<BlockRpc> for BlockManager {
async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
match message.msg() {
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 0a278bc0..3ef51fae 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -13,7 +13,6 @@ path = "lib.rs"
[dependencies]
err-derive.workspace = true
-hexdump.workspace = true
tracing.workspace = true
heed = { workspace = true, optional = true }
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 483e33c0..c566c3e0 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,7 +23,10 @@ path = "tests/lib.rs"
[dependencies]
format_table.workspace = true
garage_db.workspace = true
-garage_api.workspace = true
+garage_api_admin.workspace = true
+garage_api_s3.workspace = true
+garage_api_k2v = { workspace = true, optional = true }
+garage_api_common.workspace = true
garage_block.workspace = true
garage_model.workspace = true
garage_net.workspace = true
@@ -40,29 +43,23 @@ parse_duration.workspace = true
hex.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
-rand.workspace = true
async-trait.workspace = true
sha1.workspace = true
sodiumoxide.workspace = true
structopt.workspace = true
git-version.workspace = true
-serde.workspace = true
-serde_bytes.workspace = true
-toml.workspace = true
-
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
opentelemetry.workspace = true
opentelemetry-prometheus = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
-prometheus = { workspace = true, optional = true }
syslog-tracing = { workspace = true, optional = true }
[dev-dependencies]
-aws-config.workspace = true
+garage_api_common.workspace = true
+
aws-sdk-s3.workspace = true
chrono.workspace = true
http.workspace = true
@@ -84,7 +81,7 @@ k2v-client.workspace = true
[features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ]
-k2v = [ "garage_util/k2v", "garage_api/k2v" ]
+k2v = [ "garage_util/k2v", "garage_api_k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
@@ -95,7 +92,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).
-metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ]
+metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ]
# Exporter for the OpenTelemetry Collector.
telemetry-otlp = [ "opentelemetry-otlp" ]
# Logging to syslog
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
deleted file mode 100644
index edeb88c0..00000000
--- a/src/garage/admin/block.rs
+++ /dev/null
@@ -1,235 +0,0 @@
-use garage_util::data::*;
-
-use garage_table::*;
-
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::*;
-
-use crate::cli::*;
-
-use super::*;
-
-impl AdminRpcHandler {
- pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
- self.garage.block_manager.list_resync_errors()?,
- )),
- BlockOperation::Info { hash } => self.handle_block_info(hash).await,
- BlockOperation::RetryNow { all, blocks } => {
- self.handle_block_retry_now(*all, blocks).await
- }
- BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
- }
- }
-
- async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
- let hash = self.find_block_hash_by_prefix(hash)?;
- let refcount = self.garage.block_manager.get_block_rc(&hash)?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
- let mut versions = vec![];
- let mut uploads = vec![];
- for br in block_refs {
- if let Some(v) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
- if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
- uploads.push(u);
- }
- }
- versions.push(Ok(v));
- } else {
- versions.push(Err(br.version));
- }
- }
- Ok(AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- uploads,
- })
- }
-
- async fn handle_block_retry_now(
- &self,
- all: bool,
- blocks: &[String],
- ) -> Result<AdminRpc, Error> {
- if all {
- if !blocks.is_empty() {
- return Err(Error::BadRequest(
- "--all was specified, cannot also specify blocks".into(),
- ));
- }
- let blocks = self.garage.block_manager.list_resync_errors()?;
- for b in blocks.iter() {
- self.garage.block_manager.resync.clear_backoff(&b.hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- } else {
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- self.garage.block_manager.resync.clear_backoff(&hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- }
- }
-
- async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
- if !yes {
- return Err(Error::BadRequest(
- "Pass the --yes flag to confirm block purge operation.".into(),
- ));
- }
-
- let mut obj_dels = 0;
- let mut mpu_dels = 0;
- let mut ver_dels = 0;
-
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
-
- for br in block_refs {
- if let Some(version) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- self.handle_block_purge_version_backlink(
- &version,
- &mut obj_dels,
- &mut mpu_dels,
- )
- .await?;
-
- if !version.deleted.get() {
- let deleted_version = Version::new(version.uuid, version.backlink, true);
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
- }
-
- Ok(AdminRpc::Ok(format!(
- "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
- blocks.len(),
- ver_dels,
- obj_dels,
- mpu_dels,
- )))
- }
-
- async fn handle_block_purge_version_backlink(
- &self,
- version: &Version,
- obj_dels: &mut usize,
- mpu_dels: &mut usize,
- ) -> Result<(), Error> {
- let (bucket_id, key, ov_id) = match &version.backlink {
- VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
- VersionBacklink::MultipartUpload { upload_id } => {
- if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
- if !mpu.deleted.get() {
- mpu.parts.clear();
- mpu.deleted.set();
- self.garage.mpu_table.insert(&mpu).await?;
- *mpu_dels += 1;
- }
- (mpu.bucket_id, mpu.key.clone(), *upload_id)
- } else {
- return Ok(());
- }
- }
- };
-
- if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == ov_id {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- bucket_id,
- key,
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- *obj_dels += 1;
- }
- }
- }
-
- Ok(())
- }
-
- // ---- helper function ----
- fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> {
- if prefix.len() < 4 {
- return Err(Error::BadRequest(
- "Please specify at least 4 characters of the block hash".into(),
- ));
- }
-
- let prefix_bin =
- hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
-
- let iter = self
- .garage
- .block_ref_table
- .data
- .store
- .range(&prefix_bin[..]..)
- .map_err(GarageError::from)?;
- let mut found = None;
- for item in iter {
- let (k, _v) = item.map_err(GarageError::from)?;
- let hash = Hash::try_from(&k[..32]).unwrap();
- if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
- break;
- }
- if hex::encode(hash.as_slice()).starts_with(prefix) {
- match &found {
- Some(x) if *x == hash => (),
- Some(_) => {
- return Err(Error::BadRequest(format!(
- "Several blocks match prefix `{}`",
- prefix
- )));
- }
- None => {
- found = Some(hash);
- }
- }
- }
- }
-
- found.ok_or_else(|| Error::BadRequest("No matching block found".into()))
- }
-}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
deleted file mode 100644
index 26d54084..00000000
--- a/src/garage/admin/bucket.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use std::fmt::Write;
-
-use garage_model::helper::error::{Error, OkOrBadRequest};
-
-use crate::cli::*;
-
-use super::*;
-
-impl AdminRpcHandler {
- pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BucketOperation::CleanupIncompleteUploads(query) => {
- self.handle_bucket_cleanup_incomplete_uploads(query).await
- }
- _ => unreachable!(),
- }
- }
-
- async fn handle_bucket_cleanup_incomplete_uploads(
- &self,
- query: &CleanupIncompleteUploadsOpt,
- ) -> Result<AdminRpc, Error> {
- let mut bucket_ids = vec![];
- for b in query.buckets.iter() {
- bucket_ids.push(
- self.garage
- .bucket_helper()
- .admin_get_existing_matching_bucket(b)
- .await?,
- );
- }
-
- let duration = parse_duration::parse::parse(&query.older_than)
- .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
-
- let mut ret = String::new();
- for bucket in bucket_ids {
- let count = self
- .garage
- .bucket_helper()
- .cleanup_incomplete_uploads(&bucket, duration)
- .await?;
- writeln!(
- &mut ret,
- "Bucket {:?}: {} incomplete uploads aborted",
- bucket, count
- )
- .unwrap();
- }
-
- Ok(AdminRpc::Ok(ret))
- }
-}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
deleted file mode 100644
index 1888a208..00000000
--- a/src/garage/admin/mod.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-mod block;
-mod bucket;
-
-use std::collections::HashMap;
-use std::fmt::Write;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
-use format_table::format_table_to_string;
-
-use garage_util::background::BackgroundRunner;
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_rpc::layout::PARTITION_BITS;
-use garage_rpc::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::garage::Garage;
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::s3::mpu_table::MultipartUpload;
-use garage_model::s3::version_table::Version;
-
-use garage_api::admin::api::{AdminApiRequest, TaggedAdminApiResponse};
-use garage_api::admin::EndpointHandler as AdminApiEndpoint;
-use garage_api::generic_server::ApiError;
-
-use crate::cli::*;
-use crate::repair::online::launch_online_repair;
-
-pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
-
-#[derive(Debug, Serialize, Deserialize)]
-#[allow(clippy::large_enum_variant)]
-pub enum AdminRpc {
- BucketOperation(BucketOperation),
- LaunchRepair(RepairOpt),
- Stats(StatsOpt),
- Worker(WorkerOperation),
- BlockOperation(BlockOperation),
- MetaOperation(MetaOperation),
-
- // Replies
- Ok(String),
- WorkerList(
- HashMap<usize, garage_util::background::WorkerInfo>,
- WorkerListOpt,
- ),
- WorkerVars(Vec<(Uuid, String, String)>),
- WorkerInfo(usize, garage_util::background::WorkerInfo),
- BlockErrorList(Vec<BlockResyncErrorInfo>),
- BlockInfo {
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- uploads: Vec<MultipartUpload>,
- },
-
- // Proxying HTTP Admin API endpoints
- ApiRequest(AdminApiRequest),
- ApiOkResponse(TaggedAdminApiResponse),
- ApiErrorResponse {
- http_code: u16,
- error_code: String,
- message: String,
- },
-}
-
-impl Rpc for AdminRpc {
- type Response = Result<AdminRpc, Error>;
-}
-
-pub struct AdminRpcHandler {
- garage: Arc<Garage>,
- background: Arc<BackgroundRunner>,
- endpoint: Arc<Endpoint<AdminRpc, Self>>,
-}
-
-impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
- let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self {
- garage,
- background,
- endpoint,
- });
- admin.endpoint.set_handler(admin.clone());
- admin
- }
-
- // ================ REPAIR COMMANDS ====================
-
- async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate repair operations.".to_string(),
- ));
- }
- if opt.all_nodes {
- let mut opt_to_send = opt.clone();
- opt_to_send.all_nodes = false;
-
- let mut failures = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- let resp = self
- .endpoint
- .call(
- &node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- PRIO_NORMAL,
- )
- .await;
- if !matches!(resp, Ok(Ok(_))) {
- failures.push(node);
- }
- }
- if failures.is_empty() {
- Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
- } else {
- Err(Error::BadRequest(format!(
- "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
- failures
- )))
- }
- } else {
- launch_online_repair(&self.garage, &self.background, opt).await?;
- Ok(AdminRpc::Ok(format!(
- "Repair launched on {:?}",
- self.garage.system.id
- )))
- }
- }
-
- // ================ STATS COMMANDS ====================
-
- async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
- if opt.all_nodes {
- let mut ret = String::new();
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
-
- for node in all_nodes.iter() {
- let mut opt = opt.clone();
- opt.all_nodes = false;
- opt.skip_global = true;
-
- writeln!(&mut ret, "\n======================").unwrap();
- writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
-
- let node_id = (*node).into();
- match self
- .endpoint
- .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
- {
- Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
- Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
- }
- }
-
- writeln!(&mut ret, "\n======================").unwrap();
- write!(
- &mut ret,
- "Cluster statistics:\n\n{}",
- self.gather_cluster_stats()
- )
- .unwrap();
-
- Ok(AdminRpc::Ok(ret))
- } else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
- }
- }
-
- fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
- let mut ret = String::new();
- writeln!(
- &mut ret,
- "\nGarage version: {} [features: {}]\nRust compiler version: {}",
- garage_util::version::garage_version(),
- garage_util::version::garage_features()
- .map(|list| list.join(", "))
- .unwrap_or_else(|| "(unknown)".into()),
- garage_util::version::rust_version(),
- )
- .unwrap();
-
- writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
-
- // Gather table statistics
- let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
- table.push(self.gather_table_stats(&self.garage.bucket_table)?);
- table.push(self.gather_table_stats(&self.garage.key_table)?);
- table.push(self.gather_table_stats(&self.garage.object_table)?);
- table.push(self.gather_table_stats(&self.garage.version_table)?);
- table.push(self.gather_table_stats(&self.garage.block_ref_table)?);
- write!(
- &mut ret,
- "\nTable stats:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- // Gather block manager statistics
- writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- let rc_len = self.garage.block_manager.rc_len()?.to_string();
-
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- rc_len
- )
- .unwrap();
- writeln!(
- &mut ret,
- " resync queue length: {}",
- self.garage.block_manager.resync.queue_len()?
- )
- .unwrap();
- writeln!(
- &mut ret,
- " blocks with resync errors: {}",
- self.garage.block_manager.resync.errors_len()?
- )
- .unwrap();
-
- if !opt.skip_global {
- write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
- }
-
- Ok(ret)
- }
-
- fn gather_cluster_stats(&self) -> String {
- let mut ret = String::new();
-
- // Gather storage node and free space statistics for current nodes
- let layout = &self.garage.system.cluster_layout();
- let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.current().ring_assignment_data.iter() {
- let id = layout.current().node_id_vec[*short_id as usize];
- *node_partition_count.entry(id).or_default() += 1;
- }
- let node_info = self
- .garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|n| (n.id, n))
- .collect::<HashMap<_, _>>();
-
- let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
- for (id, parts) in node_partition_count.iter() {
- let info = node_info.get(id);
- let status = info.map(|x| &x.status);
- let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
- let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
- let capacity = role
- .map(|x| x.capacity_string())
- .unwrap_or_else(|| "?".into());
- let avail_str = |x| match x {
- Some((avail, total)) => {
- let pct = (avail as f64) / (total as f64) * 100.;
- let avail = bytesize::ByteSize::b(avail);
- let total = bytesize::ByteSize::b(total);
- format!("{}/{} ({:.1}%)", avail, total, pct)
- }
- None => "?".into(),
- };
- let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
- let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
- table.push(format!(
- " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
- id, hostname, zone, capacity, parts, data_avail, meta_avail
- ));
- }
- write!(
- &mut ret,
- "Storage nodes:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- let meta_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.meta_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- let data_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.data_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
- let meta_avail =
- bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- let data_avail =
- bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- writeln!(
- &mut ret,
- "\nEstimated available storage space cluster-wide (might be lower in practice):"
- )
- .unwrap();
- if meta_part_avail.len() < node_partition_count.len()
- || data_part_avail.len() < node_partition_count.len()
- {
- writeln!(&mut ret, " data: < {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
- writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
- } else {
- writeln!(&mut ret, " data: {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
- }
- }
-
- ret
- }
-
- fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error>
- where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
- {
- let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
- let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
-
- Ok(format!(
- " {}\t{}\t{}\t{}\t{}",
- F::TABLE_NAME,
- data_len,
- mkl_len,
- t.merkle_updater.todo_len()?,
- t.data.gc_todo_len()?
- ))
- }
-
- // ================ WORKER COMMANDS ====================
-
- async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
- match cmd {
- WorkerOperation::List { opt } => {
- let workers = self.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, *opt))
- }
- WorkerOperation::Info { tid } => {
- let info = self
- .background
- .get_worker_info()
- .get(tid)
- .ok_or_bad_request(format!("No worker with TID {}", tid))?
- .clone();
- Ok(AdminRpc::WorkerInfo(*tid, info))
- }
- WorkerOperation::Get {
- all_nodes,
- variable,
- } => self.handle_get_var(*all_nodes, variable).await,
- WorkerOperation::Set {
- all_nodes,
- variable,
- value,
- } => self.handle_set_var(*all_nodes, variable, value).await,
- }
- }
-
- async fn handle_get_var(
- &self,
- all_nodes: bool,
- variable: &Option<String>,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Get {
- all_nodes: false,
- variable: variable.clone(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- #[allow(clippy::collapsible_else_if)]
- if let Some(v) = variable {
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- v.clone(),
- self.garage.bg_vars.get(v)?,
- )]))
- } else {
- let mut vars = self.garage.bg_vars.get_all();
- vars.sort();
- Ok(AdminRpc::WorkerVars(
- vars.into_iter()
- .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
- .collect(),
- ))
- }
- }
- }
-
- async fn handle_set_var(
- &self,
- all_nodes: bool,
- variable: &str,
- value: &str,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Set {
- all_nodes: false,
- variable: variable.to_string(),
- value: value.to_string(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- self.garage.bg_vars.set(variable, value)?;
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- variable.to_string(),
- value.to_string(),
- )]))
- }
- }
-
- // ================ META DB COMMANDS ====================
-
- async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
- match mo {
- MetaOperation::Snapshot { all: true } => {
- let to = self.garage.system.cluster_layout().all_nodes().to_vec();
-
- let resps = futures::future::join_all(to.iter().map(|to| async move {
- let to = (*to).into();
- self.endpoint
- .call(
- &to,
- AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
- PRIO_NORMAL,
- )
- .await
- }))
- .await;
-
- let mut ret = vec![];
- for (to, resp) in to.iter().zip(resps.iter()) {
- let res_str = match resp {
- Ok(_) => "ok".to_string(),
- Err(e) => format!("error: {}", e),
- };
- ret.push(format!("{:?}\t{}", to, res_str));
- }
-
- Ok(AdminRpc::Ok(format_table_to_string(ret)))
- }
- MetaOperation::Snapshot { all: false } => {
- garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
- Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
- }
- }
- }
-
- // ================== PROXYING ADMIN API REQUESTS ===================
-
- async fn handle_api_request(
- self: &Arc<Self>,
- req: &AdminApiRequest,
- ) -> Result<AdminRpc, Error> {
- let req = req.clone();
- info!("Proxied admin API request: {}", req.name());
- let res = req.handle(&self.garage).await;
- match res {
- Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
- Err(e) => Ok(AdminRpc::ApiErrorResponse {
- http_code: e.http_status_code().as_u16(),
- error_code: e.code().to_string(),
- message: e.to_string(),
- }),
- }
- }
-}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
- self: &Arc<Self>,
- message: &AdminRpc,
- _from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
- AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
-}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
deleted file mode 100644
index a6540c65..00000000
--- a/src/garage/cli/cmd.rs
+++ /dev/null
@@ -1,60 +0,0 @@
-use garage_util::error::*;
-
-use garage_rpc::system::*;
-use garage_rpc::*;
-
-use garage_model::helper::error::Error as HelperError;
-
-use crate::admin::*;
-use crate::cli::*;
-
-pub async fn cmd_admin(
- rpc_cli: &Endpoint<AdminRpc, ()>,
- rpc_host: NodeID,
- args: AdminRpc,
-) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
- AdminRpc::Ok(msg) => {
- println!("{}", msg);
- }
- AdminRpc::WorkerList(wi, wlo) => {
- print_worker_list(wi, wlo);
- }
- AdminRpc::WorkerVars(wv) => {
- print_worker_vars(wv);
- }
- AdminRpc::WorkerInfo(tid, wi) => {
- print_worker_info(tid, wi);
- }
- AdminRpc::BlockErrorList(el) => {
- print_block_error_list(el);
- }
- AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- uploads,
- } => {
- print_block_info(hash, refcount, versions, uploads);
- }
- r => {
- error!("Unexpected response: {:?}", r);
- }
- }
- Ok(())
-}
-
-// ---- utility ----
-
-pub async fn fetch_status(
- rpc_cli: &Endpoint<SystemRpc, ()>,
- rpc_host: NodeID,
-) -> Result<Vec<KnownNodeInfo>, Error> {
- match rpc_cli
- .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
- .await??
- {
- SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
- resp => Err(Error::unexpected_rpc_message(resp)),
- }
-}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index bb81d144..bb77cc2a 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -7,7 +7,7 @@ use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use crate::cli::*;
+use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
@@ -260,6 +260,19 @@ pub async fn cmd_layout_skip_dead_nodes(
// --- utility ---
+pub async fn fetch_status(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+) -> Result<Vec<KnownNodeInfo>, Error> {
+ match rpc_cli
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await??
+ {
+ SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
+ resp => Err(Error::unexpected_rpc_message(resp)),
+ }
+}
+
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs
index 30f566e2..e007808b 100644
--- a/src/garage/cli/mod.rs
+++ b/src/garage/cli/mod.rs
@@ -1,12 +1,7 @@
-pub(crate) mod cmd;
-pub(crate) mod init;
-pub(crate) mod layout;
pub(crate) mod structs;
-pub(crate) mod util;
pub(crate) mod convert_db;
+pub(crate) mod init;
+pub(crate) mod repair;
-pub(crate) use cmd::*;
-pub(crate) use init::*;
-pub(crate) use structs::*;
-pub(crate) use util::*;
+pub(crate) mod layout;
diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs
index 45024e71..45024e71 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/cli/repair.rs
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 4ec35e68..c6471515 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::version::garage_version;
@@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt {
pub(crate) allow_missing_data: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list", version = garage_version())]
@@ -237,7 +236,7 @@ pub enum BucketOperation {
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@@ -259,13 +258,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@@ -275,7 +274,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@@ -288,7 +287,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@@ -321,7 +320,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
@@ -336,7 +335,7 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
@@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt {
pub buckets: Vec<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
@@ -382,7 +381,7 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -391,14 +390,14 @@ pub struct KeyInfoOpt {
pub show_secret: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(default_value = "Unnamed key")]
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -407,7 +406,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -417,7 +416,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -427,7 +426,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
@@ -444,7 +443,7 @@ pub struct KeyImportOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
@@ -458,7 +457,7 @@ pub struct RepairOpt {
pub what: RepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
@@ -489,7 +488,7 @@ pub enum RepairWhat {
Rebalance,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start", version = garage_version())]
@@ -503,15 +502,9 @@ pub enum ScrubCmd {
/// Cancel scrub in progress
#[structopt(name = "cancel", version = garage_version())]
Cancel,
- /// Set tranquility level for in-progress and future scrubs
- #[structopt(name = "set-tranquility", version = garage_version())]
- SetTranquility {
- #[structopt()]
- tranquility: u32,
- },
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
@@ -521,7 +514,7 @@ pub struct OfflineRepairOpt {
pub what: OfflineRepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
@@ -532,19 +525,14 @@ pub enum OfflineRepairWhat {
ObjectCounters,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
-
- /// Don't show global cluster stats (internal use in RPC)
- #[structopt(skip)]
- #[serde(default)]
- pub skip_global: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
@@ -577,7 +565,7 @@ pub enum WorkerOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
@@ -587,7 +575,7 @@ pub struct WorkerListOpt {
pub errors: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
@@ -619,7 +607,7 @@ pub enum BlockOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
deleted file mode 100644
index a3a1480e..00000000
--- a/src/garage/cli/util.rs
+++ /dev/null
@@ -1,216 +0,0 @@
-use std::collections::HashMap;
-use std::time::Duration;
-
-use format_table::format_table;
-use garage_util::background::*;
-use garage_util::data::*;
-use garage_util::time::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::s3::mpu_table::MultipartUpload;
-use garage_model::s3::version_table::*;
-
-use crate::cli::structs::WorkerListOpt;
-
-pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
- let mut wi = wi.into_iter().collect::<Vec<_>>();
- wi.sort_by_key(|(tid, info)| {
- (
- match info.state {
- WorkerState::Busy | WorkerState::Throttled(_) => 0,
- WorkerState::Idle => 1,
- WorkerState::Done => 2,
- },
- *tid,
- )
- });
-
- let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
- for (tid, info) in wi.iter() {
- if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
- continue;
- }
- if wlo.errors && info.errors == 0 {
- continue;
- }
-
- let tf = timeago::Formatter::new();
- let err_ago = info
- .last_error
- .as_ref()
- .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
- .unwrap_or_default();
- let (total_err, consec_err) = if info.errors > 0 {
- (info.errors.to_string(), info.consecutive_errors.to_string())
- } else {
- ("-".into(), "-".into())
- };
-
- table.push(format!(
- "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
- tid,
- info.state,
- info.name,
- info.status
- .tranquility
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- info.status.progress.as_deref().unwrap_or("-"),
- info.status
- .queue_length
- .as_ref()
- .map(ToString::to_string)
- .unwrap_or_else(|| "-".into()),
- total_err,
- consec_err,
- err_ago,
- ));
- }
- format_table(table);
-}
-
-pub fn print_worker_info(tid: usize, info: WorkerInfo) {
- let mut table = vec![];
- table.push(format!("Task id:\t{}", tid));
- table.push(format!("Worker name:\t{}", info.name));
- match info.state {
- WorkerState::Throttled(t) => {
- table.push(format!(
- "Worker state:\tBusy (throttled, paused for {:.3}s)",
- t
- ));
- }
- s => {
- table.push(format!("Worker state:\t{}", s));
- }
- };
- if let Some(tql) = info.status.tranquility {
- table.push(format!("Tranquility:\t{}", tql));
- }
-
- table.push("".into());
- table.push(format!("Total errors:\t{}", info.errors));
- table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
- if let Some((s, t)) = info.last_error {
- table.push(format!("Last error:\t{}", s));
- let tf = timeago::Formatter::new();
- table.push(format!(
- "Last error time:\t{}",
- tf.convert(Duration::from_millis(now_msec() - t))
- ));
- }
-
- table.push("".into());
- if let Some(p) = info.status.progress {
- table.push(format!("Progress:\t{}", p));
- }
- if let Some(ql) = info.status.queue_length {
- table.push(format!("Queue length:\t{}", ql));
- }
- if let Some(pe) = info.status.persistent_errors {
- table.push(format!("Persistent errors:\t{}", pe));
- }
-
- for (i, s) in info.status.freeform.iter().enumerate() {
- if i == 0 {
- if table.last() != Some(&"".into()) {
- table.push("".into());
- }
- table.push(format!("Message:\t{}", s));
- } else {
- table.push(format!("\t{}", s));
- }
- }
- format_table(table);
-}
-
-pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
- let table = wv
- .into_iter()
- .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
- .collect::<Vec<_>>();
- format_table(table);
-}
-
-pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
- let now = now_msec();
- let tf = timeago::Formatter::new();
- let mut tf2 = timeago::Formatter::new();
- tf2.ago("");
-
- let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
- for e in el {
- let next_try = if e.next_try > now {
- tf2.convert(Duration::from_millis(e.next_try - now))
- } else {
- "asap".to_string()
- };
- table.push(format!(
- "{}\t{}\t{}\t{}\tin {}",
- hex::encode(e.hash.as_slice()),
- e.refcount,
- e.error_count,
- tf.convert(Duration::from_millis(now - e.last_try)),
- next_try
- ));
- }
- format_table(table);
-}
-
-pub fn print_block_info(
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- uploads: Vec<MultipartUpload>,
-) {
- println!("Block hash: {}", hex::encode(hash.as_slice()));
- println!("Refcount: {}", refcount);
- println!();
-
- let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
- let mut nondeleted_count = 0;
- for v in versions.iter() {
- match v {
- Ok(ver) => {
- match &ver.backlink {
- VersionBacklink::Object { bucket_id, key } => {
- table.push(format!(
- "{:?}\t{:?}\t{}\t\t{:?}",
- ver.uuid,
- bucket_id,
- key,
- ver.deleted.get()
- ));
- }
- VersionBacklink::MultipartUpload { upload_id } => {
- let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
- table.push(format!(
- "{:?}\t{:?}\t{}\t{:?}\t{:?}",
- ver.uuid,
- upload.map(|u| u.bucket_id).unwrap_or_default(),
- upload.map(|u| u.key.as_str()).unwrap_or_default(),
- upload_id,
- ver.deleted.get()
- ));
- }
- }
- if !ver.deleted.get() {
- nondeleted_count += 1;
- }
- }
- Err(vh) => {
- table.push(format!("{:?}\t\t\t\tyes", vh));
- }
- }
- }
- format_table(table);
-
- if refcount != nondeleted_count {
- println!();
- println!(
- "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
- );
- }
-}
diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs
new file mode 100644
index 00000000..bfc0db4a
--- /dev/null
+++ b/src/garage/cli_v2/block.rs
@@ -0,0 +1,145 @@
+//use bytesize::ByteSize;
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {
+ match cmd {
+ BlockOperation::ListErrors => self.cmd_list_block_errors().await,
+ BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
+ BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
+ }
+ }
+
+ pub async fn cmd_list_block_errors(&self) -> Result<(), Error> {
+ let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0;
+
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in errors {
+ let next_try = if e.next_try_in_secs > 0 {
+ tf2.convert(Duration::from_secs(e.next_try_in_secs))
+ } else {
+ "asap".to_string()
+ };
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ e.block_hash,
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_secs(e.last_try_secs_ago)),
+ next_try
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetBlockInfoRequest { block_hash: hash })
+ .await?;
+
+ println!("Block hash: {}", info.block_hash);
+ println!("Refcount: {}", info.refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for ver in info.versions.iter() {
+ match &ver.backlink {
+ Some(BlockVersionBacklink::Object { bucket_id, key }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t\t{:?}",
+ ver.version_id, bucket_id, key, ver.deleted
+ ));
+ }
+ Some(BlockVersionBacklink::Upload {
+ upload_id,
+ upload_deleted: _,
+ upload_garbage_collected: _,
+ bucket_id,
+ key,
+ }) => {
+ table.push(format!(
+ "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}",
+ ver.version_id,
+ bucket_id.as_deref().unwrap_or(""),
+ key.as_deref().unwrap_or(""),
+ upload_id,
+ ver.deleted
+ ));
+ }
+ None => {
+ table.push(format!("{:.16}\t\t\tyes", ver.version_id));
+ }
+ }
+ if !ver.deleted {
+ nondeleted_count += 1;
+ }
+ }
+ format_table(table);
+
+ if info.refcount != nondeleted_count {
+ println!();
+ println!(
+ "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
+ );
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
+ let req = match (all, blocks.len()) {
+ (true, 0) => LocalRetryBlockResyncRequest::All { all: true },
+ (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
+ block_hashes: blocks,
+ },
+ _ => {
+ return Err(Error::Message(
+ "Please specify block hashes or --all (not both)".into(),
+ ))
+ }
+ };
+
+ let res = self.local_api_request(req).await?;
+
+ println!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ res.count
+ );
+
+ Ok(())
+ }
+
+ pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
+ if !yes {
+ return Err(Error::Message(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let res = self
+ .local_api_request(LocalPurgeBlocksRequest(blocks))
+ .await?;
+
+ println!(
+ "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
+ res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
+ );
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs
index 837ce783..c25c2c3e 100644
--- a/src/garage/cli_v2/bucket.rs
+++ b/src/garage/cli_v2/bucket.rs
@@ -3,9 +3,8 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@@ -22,15 +21,9 @@ impl Cli {
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
-
- // TODO
- x => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BucketOperation(x),
- )
- .await
- .ok_or_message("old error"),
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.cmd_cleanup_incomplete_uploads(query).await
+ }
}
}
@@ -520,4 +513,37 @@ impl Cli {
Ok(())
}
+
+ pub async fn cmd_cleanup_incomplete_uploads(
+ &self,
+ opt: CleanupIncompleteUploadsOpt,
+ ) -> Result<(), Error> {
+ let older_than = parse_duration::parse::parse(&opt.older_than)
+ .ok_or_message("Invalid duration passed for --older-than parameter")?;
+
+ for b in opt.buckets.iter() {
+ let bucket = self
+ .api_request(GetBucketInfoRequest {
+ id: None,
+ global_alias: None,
+ search: Some(b.clone()),
+ })
+ .await?;
+
+ let res = self
+ .api_request(CleanupIncompleteUploadsRequest {
+ bucket_id: bucket.id.clone(),
+ older_than_secs: older_than.as_secs(),
+ })
+ .await?;
+
+ if res.uploads_deleted > 0 {
+ println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted);
+ } else {
+ println!("{:.16}: no uploads deleted", bucket.id);
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs
index 34a28674..6eb65d12 100644
--- a/src/garage/cli_v2/cluster.rs
+++ b/src/garage/cli_v2/cluster.rs
@@ -2,7 +2,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::layout::*;
diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli_v2/key.rs
index ff403a9a..b956906d 100644
--- a/src/garage/cli_v2/key.rs
+++ b/src/garage/cli_v2/key.rs
@@ -2,7 +2,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs
index d44771c7..2f14b332 100644
--- a/src/garage/cli_v2/layout.rs
+++ b/src/garage/cli_v2/layout.rs
@@ -3,7 +3,7 @@ use format_table::format_table;
use garage_util::error::*;
-use garage_api::admin::api::*;
+use garage_api_admin::api::*;
use crate::cli::layout as cli_v1;
use crate::cli::structs::*;
diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs
index 692b7c1c..28c7c824 100644
--- a/src/garage/cli_v2/mod.rs
+++ b/src/garage/cli_v2/mod.rs
@@ -3,6 +3,10 @@ pub mod cluster;
pub mod key;
pub mod layout;
+pub mod block;
+pub mod node;
+pub mod worker;
+
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@@ -12,17 +16,15 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use garage_api::admin::api::*;
-use garage_api::admin::EndpointHandler as AdminApiEndpoint;
+use garage_api_admin::api::*;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
+use garage_api_admin::RequestHandler;
-use crate::admin::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
-use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
- pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
+ pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@@ -36,63 +38,35 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
-
- // TODO
- Command::Repair(ro) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::LaunchRepair(ro),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Stats(so) => {
- cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
- .await
- .ok_or_message("cli_v1")
- }
- Command::Worker(wo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::Worker(wo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Block(bo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::BlockOperation(bo),
- )
- .await
- .ok_or_message("cli_v1"),
- Command::Meta(mo) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::MetaOperation(mo),
- )
- .await
- .ok_or_message("cli_v1"),
+ Command::Worker(wo) => self.cmd_worker(wo).await,
+ Command::Block(bo) => self.cmd_block(bo).await,
+ Command::Meta(mo) => self.cmd_meta(mo).await,
+ Command::Stats(so) => self.cmd_stats(so).await,
+ Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}
}
- pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
+ pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
where
- T: AdminApiEndpoint,
+ T: RequestHandler,
AdminApiRequest: From<T>,
- <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
+ <T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
- .admin_rpc_endpoint
- .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
- .await?
- .ok_or_message("rpc")?
+ .proxy_rpc_endpoint
+ .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
+ .await??
{
- AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
- .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
- AdminRpc::ApiErrorResponse {
+ ProxyRpcResponse::ProxyApiOkResponse(resp) => {
+ <T as RequestHandler>::Response::try_from(resp).map_err(|_| {
+ Error::Message(format!("{} returned unexpected response", req_name))
+ })
+ }
+ ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
@@ -103,4 +77,32 @@ impl Cli {
m => Err(Error::unexpected_rpc_message(m)),
}
}
+
+ pub async fn local_api_request<T>(
+ &self,
+ req: T,
+ ) -> Result<<T as RequestHandler>::Response, Error>
+ where
+ T: RequestHandler,
+ MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
+ AdminApiRequest: From<MultiRequest<T>>,
+ <MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
+ {
+ let req = MultiRequest {
+ node: hex::encode(self.rpc_host),
+ body: req,
+ };
+ let resp = self.api_request(req).await?;
+
+ if let Some((_, e)) = resp.error.into_iter().next() {
+ return Err(Error::Message(e));
+ }
+ if resp.success.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} responses returned, expected 1",
+ resp.success.len()
+ )));
+ }
+ Ok(resp.success.into_iter().next().unwrap().1)
+ }
}
diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs
new file mode 100644
index 00000000..c5d0cdea
--- /dev/null
+++ b/src/garage/cli_v2/node.rs
@@ -0,0 +1,113 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> {
+ let MetaOperation::Snapshot { all } = cmd;
+
+ let res = self
+ .api_request(CreateMetadataSnapshotRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalCreateMetadataSnapshotRequest,
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tSnapshot created", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> {
+ let res = self
+ .api_request(GetNodeStatisticsRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetNodeStatisticsRequest,
+ })
+ .await?;
+
+ for (node, res) in res.success.iter() {
+ println!("======================");
+ println!("Stats for node {:.16}:\n", node);
+ println!("{}\n", res.freeform);
+ }
+
+ for (node, err) in res.error.iter() {
+ println!("======================");
+ println!("Node {:.16}: error: {}\n", node, err);
+ }
+
+ let res = self.api_request(GetClusterStatisticsRequest).await?;
+ println!("======================");
+ println!("Cluster statistics:\n");
+ println!("{}\n", res.freeform);
+
+ Ok(())
+ }
+
+ pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
+ if !cmd.yes {
+ return Err(Error::Message(
+ "Please add --yes to start the repair operation".into(),
+ ));
+ }
+
+ let repair_type = match cmd.what {
+ RepairWhat::Tables => RepairType::Tables,
+ RepairWhat::Blocks => RepairType::Blocks,
+ RepairWhat::Versions => RepairType::Versions,
+ RepairWhat::MultipartUploads => RepairType::MultipartUploads,
+ RepairWhat::BlockRefs => RepairType::BlockRefs,
+ RepairWhat::BlockRc => RepairType::BlockRc,
+ RepairWhat::Rebalance => RepairType::Rebalance,
+ RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
+ ScrubCmd::Start => ScrubCommand::Start,
+ ScrubCmd::Cancel => ScrubCommand::Cancel,
+ ScrubCmd::Pause => ScrubCommand::Pause,
+ ScrubCmd::Resume => ScrubCommand::Resume,
+ }),
+ };
+
+ let res = self
+ .api_request(LaunchRepairOperationRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalLaunchRepairOperationRequest { repair_type },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tRepair launched", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+}
diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs
new file mode 100644
index 00000000..9c248a39
--- /dev/null
+++ b/src/garage/cli_v2/worker.rs
@@ -0,0 +1,213 @@
+use format_table::format_table;
+
+use garage_util::error::*;
+
+use garage_api_admin::api::*;
+
+use crate::cli::structs::*;
+use crate::cli_v2::*;
+
+impl Cli {
+ pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
+ match cmd {
+ WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
+ WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.cmd_get_var(all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.cmd_set_var(all_nodes, variable, value).await,
+ }
+ }
+
+ pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
+ let mut list = self
+ .local_api_request(LocalListWorkersRequest {
+ busy_only: opt.busy,
+ error_only: opt.errors,
+ })
+ .await?
+ .0;
+
+ list.sort_by_key(|info| {
+ (
+ match info.state {
+ WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
+ WorkerStateResp::Idle => 1,
+ WorkerStateResp::Done => 2,
+ },
+ info.id,
+ )
+ });
+
+ let mut table =
+ vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
+ let tf = timeago::Formatter::new();
+ for info in list.iter() {
+ let err_ago = info
+ .last_error
+ .as_ref()
+ .map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
+ .unwrap_or_default();
+ let (total_err, consec_err) = if info.errors > 0 {
+ (info.errors.to_string(), info.consecutive_errors.to_string())
+ } else {
+ ("-".into(), "-".into())
+ };
+
+ table.push(format!(
+ "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
+ info.id,
+ format_worker_state(&info.state),
+ info.name,
+ info.tranquility
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ info.progress.as_deref().unwrap_or("-"),
+ info.queue_length
+ .as_ref()
+ .map(ToString::to_string)
+ .unwrap_or_else(|| "-".into()),
+ total_err,
+ consec_err,
+ err_ago,
+ ));
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
+ let info = self
+ .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 })
+ .await?
+ .0;
+
+ let mut table = vec![];
+ table.push(format!("Task id:\t{}", info.id));
+ table.push(format!("Worker name:\t{}", info.name));
+ match &info.state {
+ WorkerStateResp::Throttled { duration_secs } => {
+ table.push(format!(
+ "Worker state:\tBusy (throttled, paused for {:.3}s)",
+ duration_secs
+ ));
+ }
+ s => {
+ table.push(format!("Worker state:\t{}", format_worker_state(s)));
+ }
+ };
+ if let Some(tql) = info.tranquility {
+ table.push(format!("Tranquility:\t{}", tql));
+ }
+
+ table.push("".into());
+ table.push(format!("Total errors:\t{}", info.errors));
+ table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
+ if let Some(err) = info.last_error {
+ table.push(format!("Last error:\t{}", err.message));
+ let tf = timeago::Formatter::new();
+ table.push(format!(
+ "Last error time:\t{}",
+ tf.convert(Duration::from_secs(err.secs_ago))
+ ));
+ }
+
+ table.push("".into());
+ if let Some(p) = info.progress {
+ table.push(format!("Progress:\t{}", p));
+ }
+ if let Some(ql) = info.queue_length {
+ table.push(format!("Queue length:\t{}", ql));
+ }
+ if let Some(pe) = info.persistent_errors {
+ table.push(format!("Persistent errors:\t{}", pe));
+ }
+
+ for (i, s) in info.freeform.iter().enumerate() {
+ if i == 0 {
+ if table.last() != Some(&"".into()) {
+ table.push("".into());
+ }
+ table.push(format!("Message:\t{}", s));
+ } else {
+ table.push(format!("\t{}", s));
+ }
+ }
+ format_table(table);
+
+ Ok(())
+ }
+
+ pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
+ let res = self
+ .api_request(GetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalGetWorkerVariableRequest { variable: var },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, vars) in res.success.iter() {
+ for (key, val) in vars.0.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, key, val));
+ }
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+
+ pub async fn cmd_set_var(
+ &self,
+ all: bool,
+ variable: String,
+ value: String,
+ ) -> Result<(), Error> {
+ let res = self
+ .api_request(SetWorkerVariableRequest {
+ node: if all {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalSetWorkerVariableRequest { variable, value },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, kv) in res.success.iter() {
+ table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
+ }
+ format_table(table);
+
+ for (node, err) in res.error.iter() {
+ eprintln!("{:.16}: error: {}", node, err);
+ }
+
+ Ok(())
+ }
+}
+
+fn format_worker_state(s: &WorkerStateResp) -> &'static str {
+ match s {
+ WorkerStateResp::Busy => "Busy",
+ WorkerStateResp::Throttled { .. } => "Busy*",
+ WorkerStateResp::Idle => "Idle",
+ WorkerStateResp::Done => "Done",
+ }
+}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 08c7cee7..2a88d760 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -4,10 +4,8 @@
#[macro_use]
extern crate tracing;
-mod admin;
mod cli;
mod cli_v2;
-mod repair;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@@ -35,8 +33,9 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use admin::*;
-use cli::*;
+use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
+
+use cli::structs::*;
use secrets::Secrets;
#[derive(StructOpt, Debug)]
@@ -144,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
- repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
+ cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
- node_id_command(opt.config_file, node_id_opt.quiet)
+ cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
_ => cli_command(opt).await,
};
@@ -251,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
- .err_context(READ_KEY_ERROR)?;
+ .err_context(cli::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@@ -281,11 +280,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
- let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
+ let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
- admin_rpc_endpoint,
+ proxy_rpc_endpoint,
rpc_host: id,
};
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
deleted file mode 100644
index 4699ace5..00000000
--- a/src/garage/repair/mod.rs
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod offline;
-pub mod online;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 65bf34db..131cc8aa 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -6,15 +6,14 @@ use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::Error;
-use garage_api::admin::api_server::AdminApiServer;
-use garage_api::s3::api_server::S3ApiServer;
+use garage_api_admin::api_server::AdminApiServer;
+use garage_api_s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::WebServer;
#[cfg(feature = "k2v")]
-use garage_api::k2v::api_server::K2VApiServer;
+use garage_api_k2v::api_server::K2VApiServer;
-use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
@@ -66,6 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(
garage.clone(),
+ background.clone(),
#[cfg(feature = "metrics")]
metrics_exporter,
);
@@ -73,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
- info!("Create admin RPC handler...");
- AdminRpcHandler::new(garage.clone(), background.clone());
-
// ---- Launch public-facing API servers ----
let mut servers = vec![];
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 42368976..2db72e9f 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
use super::garage::{Instance, Key};
-use garage_api::signature;
+use garage_api_common::signature;
pub type Body = FullBody<hyper::body::Bytes>;
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 694be1f8..bbd09b19 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -29,12 +29,11 @@ tokio.workspace = true
# cli deps
clap = { workspace = true, optional = true }
format_table = { workspace = true, optional = true }
-tracing = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
[features]
-cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"]
+cli = ["clap", "tokio/fs", "tokio/io-std", "tracing-subscriber", "format_table"]
[lib]
path = "lib.rs"
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 12931a4c..b58ad43b 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -22,7 +22,6 @@ garage_util.workspace = true
garage_net.workspace = true
async-trait.workspace = true
-arc-swap.workspace = true
blake2.workspace = true
chrono.workspace = true
err-derive.workspace = true
@@ -38,9 +37,7 @@ serde.workspace = true
serde_bytes.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
-opentelemetry.workspace = true
[features]
default = [ "lmdb", "sqlite" ]
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index a1bf6ee0..821f4549 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -10,7 +10,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant};
-use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
@@ -537,7 +536,6 @@ impl K2VRpcHandler {
}
}
-#[async_trait]
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message {
diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml
index 686aaaea..c0b47a6e 100644
--- a/src/net/Cargo.toml
+++ b/src/net/Cargo.toml
@@ -30,7 +30,6 @@ rand.workspace = true
log.workspace = true
arc-swap.workspace = true
-async-trait.workspace = true
err-derive.workspace = true
bytes.workspace = true
cfg-if.workspace = true
diff --git a/src/net/client.rs b/src/net/client.rs
index 607dd173..20e1dacd 100644
--- a/src/net/client.rs
+++ b/src/net/client.rs
@@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex};
use std::task::Poll;
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use bytes::Bytes;
use log::{debug, error, trace};
@@ -220,7 +219,6 @@ impl ClientConn {
impl SendLoop for ClientConn {}
-#[async_trait]
impl RecvLoop for ClientConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
trace!("ClientConn recv_handler {}", id);
diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs
index 3cafafeb..d46acc42 100644
--- a/src/net/endpoint.rs
+++ b/src/net/endpoint.rs
@@ -1,8 +1,9 @@
+use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
+use futures::future::{BoxFuture, FutureExt};
use crate::error::Error;
use crate::message::*;
@@ -14,19 +15,17 @@ use crate::netapp::*;
/// attached to the response..
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
-#[async_trait]
pub trait StreamingEndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
+ fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send;
}
/// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
-#[async_trait]
impl<M: Message> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
panic!("This endpoint should not have a local handler.");
@@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () {
/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, in the cases where it doesn't
/// care about attached stream in the request nor in the response.
-#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
- async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
+ fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send;
}
-#[async_trait]
impl<T, M> StreamingEndpointHandler<M> for T
where
T: EndpointHandler<M>,
@@ -161,9 +158,8 @@ where
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
-#[async_trait]
pub(crate) trait GenericEndpoint {
- async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
+ fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>;
fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint;
}
@@ -174,21 +170,23 @@ where
M: Message,
H: StreamingEndpointHandler<M>;
-#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
M: Message,
H: StreamingEndpointHandler<M> + 'static,
{
- async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
- match self.0.handler.load_full() {
- None => Err(Error::NoHandler),
- Some(h) => {
- let req = Req::from_enc(req_enc)?;
- let res = h.handle(req, from).await;
- Ok(res.into_enc()?)
+ fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> {
+ async move {
+ match self.0.handler.load_full() {
+ None => Err(Error::NoHandler),
+ Some(h) => {
+ let req = Req::from_enc(req_enc)?;
+ let res = h.handle(req, from).await;
+ Ok(res.into_enc()?)
+ }
}
}
+ .boxed()
}
fn drop_handler(&self) {
diff --git a/src/net/netapp.rs b/src/net/netapp.rs
index 77e55774..36c6fc88 100644
--- a/src/net/netapp.rs
+++ b/src/net/netapp.rs
@@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use log::{debug, error, info, trace, warn};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::auth;
@@ -457,7 +456,6 @@ impl NetApp {
}
}
-#[async_trait]
impl EndpointHandler<HelloMessage> for NetApp {
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
diff --git a/src/net/peering.rs b/src/net/peering.rs
index a8d271ec..08378a08 100644
--- a/src/net/peering.rs
+++ b/src/net/peering.rs
@@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use arc_swap::ArcSwap;
-use async_trait::async_trait;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
@@ -592,7 +591,6 @@ impl PeeringManager {
}
}
-#[async_trait]
impl EndpointHandler<PingMessage> for PeeringManager {
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
let ping_resp = PingMessage {
@@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager {
}
}
-#[async_trait]
impl EndpointHandler<PeerListMessage> for PeeringManager {
async fn handle(
self: &Arc<Self>,
diff --git a/src/net/recv.rs b/src/net/recv.rs
index 0de7bef2..35a6d71a 100644
--- a/src/net/recv.rs
+++ b/src/net/recv.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use async_trait::async_trait;
use bytes::Bytes;
use log::*;
@@ -50,7 +49,6 @@ impl Drop for Sender {
/// according to the protocol defined above: chunks of message in progress of being
/// received are stored in a buffer, and when the last chunk of a message is received,
/// the full message is passed to the receive handler.
-#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}
diff --git a/src/net/send.rs b/src/net/send.rs
index 1454eeb7..6f1ac02c 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -3,7 +3,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use log::*;
@@ -273,7 +272,6 @@ impl DataFrame {
///
/// The `.send_loop()` exits when the sending end of the channel is closed,
/// or if there is an error at any time writing to the async writer.
-#[async_trait]
pub(crate) trait SendLoop: Sync {
async fn send_loop<W>(
self: Arc<Self>,
diff --git a/src/net/server.rs b/src/net/server.rs
index 36dccb2f..fb6c6366 100644
--- a/src/net/server.rs
+++ b/src/net/server.rs
@@ -3,7 +3,6 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt};
@@ -174,7 +173,6 @@ impl ServerConn {
impl SendLoop for ServerConn {}
-#[async_trait]
impl RecvLoop for ServerConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
let resp_send = match self.resp_send.load_full() {
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index acde0911..fcc1c304 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,12 +15,10 @@ path = "lib.rs"
[dependencies]
format_table.workspace = true
-garage_db.workspace = true
garage_util.workspace = true
garage_net.workspace = true
arc-swap.workspace = true
-bytes.workspace = true
bytesize.workspace = true
gethostname.workspace = true
hex.workspace = true
@@ -46,9 +44,7 @@ reqwest = { workspace = true, optional = true }
pnet_datalink.workspace = true
futures.workspace = true
-futures-util.workspace = true
tokio.workspace = true
-tokio-stream.workspace = true
opentelemetry.workspace = true
[features]
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 0fa68218..2a52ae5d 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use futures::join;
use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
@@ -749,7 +748,6 @@ impl System {
}
}
-#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index e704cd3c..fad6ea08 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -22,7 +22,6 @@ opentelemetry.workspace = true
async-trait.workspace = true
arc-swap.workspace = true
-bytes.workspace = true
hex.workspace = true
hexdump.workspace = true
tracing.workspace = true
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 9e060390..28ea119d 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
}
}
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 234ee8ea..2d43b9fc 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
diff --git a/src/table/table.rs b/src/table/table.rs
index ea8471d0..c96f4731 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,7 +2,6 @@ use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
-use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -204,6 +203,10 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
entries_vec.push((write_sets, e_enc));
}
+ if entries_vec.is_empty() {
+ return Ok(());
+ }
+
// Compute a deduplicated list of all of the write sets,
// and compute an index from each node to the position of the sets in which
// it takes part, to optimize the detection of a quorum.
@@ -496,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
}
}
-#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index da3e39b8..fec5b1ed 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -20,9 +20,7 @@ garage_net.workspace = true
arc-swap.workspace = true
async-trait.workspace = true
blake2.workspace = true
-bytes.workspace = true
bytesize.workspace = true
-digest.workspace = true
err-derive.workspace = true
hexdump.workspace = true
xxhash-rust.workspace = true
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 607cd7a3..cae3a462 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -6,7 +6,6 @@ pub mod worker;
use std::collections::HashMap;
use std::sync::Arc;
-use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use worker::WorkerProcessor;
@@ -18,7 +17,7 @@ pub struct BackgroundRunner {
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
-#[derive(Clone, Serialize, Deserialize, Debug)]
+#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub name: String,
pub status: WorkerStatus,
@@ -30,7 +29,7 @@ pub struct WorkerInfo {
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
-#[derive(Clone, Serialize, Deserialize, Debug, Default)]
+#[derive(Clone, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 76fb14e8..9028a052 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -6,7 +6,6 @@ use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
-use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
@@ -18,7 +17,7 @@ use crate::time::now_msec;
// will be interrupted in the middle of whatever they are doing.
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
-#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerState {
Busy,
Throttled(f32),
@@ -26,17 +25,6 @@ pub enum WorkerState {
Done,
}
-impl std::fmt::Display for WorkerState {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- WorkerState::Busy => write!(f, "Busy"),
- WorkerState::Throttled(_) => write!(f, "Busy*"),
- WorkerState::Idle => write!(f, "Idle"),
- WorkerState::Done => write!(f, "Done"),
- }
- }
-}
-
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index d810d6f9..a0a3e566 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -14,7 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api.workspace = true
+garage_api_common.workspace = true
+garage_api_s3.workspace = true
garage_model.workspace = true
garage_util.workspace = true
garage_table.workspace = true
@@ -23,12 +24,9 @@ err-derive.workspace = true
tracing.workspace = true
percent-encoding.workspace = true
-futures.workspace = true
-
http.workspace = true
http-body-util.workspace = true
hyper.workspace = true
-hyper-util.workspace = true
tokio.workspace = true
diff --git a/src/web/error.rs b/src/web/error.rs
index bd8f17b5..7e6d4542 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -2,14 +2,14 @@ use err_derive::Error;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode};
-use garage_api::generic_server::ApiError;
+use garage_api_common::generic_server::ApiError;
/// Errors of this crate
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)]
- ApiError(garage_api::s3::error::Error),
+ ApiError(garage_api_s3::error::Error),
/// The file does not exist
#[error(display = "Not found")]
@@ -22,10 +22,10 @@ pub enum Error {
impl<T> From<T> for Error
where
- garage_api::s3::error::Error: From<T>,
+ garage_api_s3::error::Error: From<T>,
{
fn from(err: T) -> Self {
- Error::ApiError(garage_api::s3::error::Error::from(err))
+ Error::ApiError(garage_api_s3::error::Error::from(err))
}
}
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 69939f65..48dcb5b1 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -20,13 +20,15 @@ use opentelemetry::{
use crate::error::*;
-use garage_api::generic_server::{server_loop, UnixListenerOn};
-use garage_api::helpers::*;
-use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
-use garage_api::s3::error::{
+use garage_api_common::cors::{
+ add_cors_headers, find_matching_cors_rule, handle_options_for_bucket,
+};
+use garage_api_common::generic_server::{server_loop, UnixListenerOn};
+use garage_api_common::helpers::*;
+use garage_api_s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
};
-use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx};
+use garage_api_s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_model::garage::Garage;