aboutsummaryrefslogtreecommitdiff
path: root/src/api
diff options
context:
space:
mode:
Diffstat (limited to 'src/api')
-rw-r--r--src/api/admin/api.rs71
-rw-r--r--src/api/admin/block.rs149
-rw-r--r--src/api/admin/error.rs9
-rw-r--r--src/api/admin/lib.rs1
-rw-r--r--src/api/admin/router_v2.rs3
-rw-r--r--src/api/admin/worker.rs4
6 files changed, 234 insertions, 3 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs
index cf136d28..42872ad0 100644
--- a/src/api/admin/api.rs
+++ b/src/api/admin/api.rs
@@ -82,6 +82,10 @@ admin_endpoints![
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
+
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
];
local_admin_endpoints![
@@ -90,6 +94,9 @@ local_admin_endpoints![
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
+ // Block operations
+ ListBlockErrors,
+ GetBlockInfo,
];
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -619,6 +626,7 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// ---- GetWorkerList ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
pub struct LocalListWorkersRequest {
#[serde(default)]
pub busy_only: bool,
@@ -694,3 +702,66 @@ pub struct LocalSetWorkerVariableResponse {
pub variable: String,
pub value: String,
}
+
+// **********************************************
+// Block operations
+// **********************************************
+
+// ---- ListBlockErrors ----
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LocalListBlockErrorsRequest;
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
+
+#[derive(Serialize, Deserialize, Clone, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockError {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try_secs_ago: u64,
+ pub next_try_in_secs: u64,
+}
+
+// ---- GetBlockInfo ----
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoRequest {
+ pub block_hash: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LocalGetBlockInfoResponse {
+ pub block_hash: String,
+ pub refcount: u64,
+ pub versions: Vec<BlockVersion>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct BlockVersion {
+ pub version_id: String,
+ pub deleted: bool,
+ pub garbage_collected: bool,
+ pub backlink: Option<BlockVersionBacklink>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub enum BlockVersionBacklink {
+ Object {
+ bucket_id: String,
+ key: String,
+ },
+ Upload {
+ upload_id: String,
+ upload_deleted: bool,
+ upload_garbage_collected: bool,
+ bucket_id: Option<String>,
+ key: Option<String>,
+ },
+}
diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs
new file mode 100644
index 00000000..157db5b5
--- /dev/null
+++ b/src/api/admin/block.rs
@@ -0,0 +1,149 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_util::time::now_msec;
+
+use garage_table::EmptyKey;
+
+use garage_model::garage::Garage;
+use garage_model::s3::version_table::*;
+
+use crate::admin::api::*;
+use crate::admin::error::*;
+use crate::admin::{Admin, RequestHandler};
+use crate::common_error::CommonErrorDerivative;
+
+#[async_trait]
+impl RequestHandler for LocalListBlockErrorsRequest {
+ type Response = LocalListBlockErrorsResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalListBlockErrorsResponse, Error> {
+ let errors = garage.block_manager.list_resync_errors()?;
+ let now = now_msec();
+ let errors = errors
+ .into_iter()
+ .map(|e| BlockError {
+ block_hash: hex::encode(&e.hash),
+ refcount: e.refcount,
+ error_count: e.error_count,
+ last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
+ next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
+ })
+ .collect();
+ Ok(LocalListBlockErrorsResponse(errors))
+ }
+}
+
+#[async_trait]
+impl RequestHandler for LocalGetBlockInfoRequest {
+ type Response = LocalGetBlockInfoResponse;
+
+ async fn handle(
+ self,
+ garage: &Arc<Garage>,
+ _admin: &Admin,
+ ) -> Result<LocalGetBlockInfoResponse, Error> {
+ let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
+ let refcount = garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
+ let bl = match &v.backlink {
+ VersionBacklink::MultipartUpload { upload_id } => {
+ if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: u.deleted.get(),
+ upload_garbage_collected: false,
+ bucket_id: Some(hex::encode(&u.bucket_id)),
+ key: Some(u.key.to_string()),
+ }
+ } else {
+ BlockVersionBacklink::Upload {
+ upload_id: hex::encode(&upload_id),
+ upload_deleted: true,
+ upload_garbage_collected: true,
+ bucket_id: None,
+ key: None,
+ }
+ }
+ }
+ VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
+ bucket_id: hex::encode(&bucket_id),
+ key: key.to_string(),
+ },
+ };
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: v.deleted.get(),
+ garbage_collected: false,
+ backlink: Some(bl),
+ });
+ } else {
+ versions.push(BlockVersion {
+ version_id: hex::encode(&br.version),
+ deleted: true,
+ garbage_collected: true,
+ backlink: None,
+ });
+ }
+ }
+ Ok(LocalGetBlockInfoResponse {
+ block_hash: hex::encode(&hash),
+ refcount,
+ versions,
+ })
+ }
+}
+
+fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
+ if prefix.len() < 4 {
+ return Err(Error::bad_request(
+ "Please specify at least 4 characters of the block hash",
+ ));
+ }
+
+ let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
+
+ let iter = garage
+ .block_ref_table
+ .data
+ .store
+ .range(&prefix_bin[..]..)
+ .map_err(GarageError::from)?;
+ let mut found = None;
+ for item in iter {
+ let (k, _v) = item.map_err(GarageError::from)?;
+ let hash = Hash::try_from(&k[..32]).unwrap();
+ if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
+ break;
+ }
+ if hex::encode(hash.as_slice()).starts_with(prefix) {
+ match &found {
+ Some(x) if *x == hash => (),
+ Some(_) => {
+ return Err(Error::bad_request(format!(
+ "Several blocks match prefix `{}`",
+ prefix
+ )));
+ }
+ None => {
+ found = Some(hash);
+ }
+ }
+ }
+ }
+
+ found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
+}
diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs
index 354a3bab..d7ea7dc9 100644
--- a/src/api/admin/error.rs
+++ b/src/api/admin/error.rs
@@ -25,6 +25,10 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
+ /// The requested block does not exist
+ #[error(display = "Block not found: {}", _0)]
+ NoSuchBlock(String),
+
/// The requested worker does not exist
#[error(display = "Worker not found: {}", _0)]
NoSuchWorker(u64),
@@ -58,6 +62,7 @@ impl Error {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
Error::NoSuchWorker(_) => "NoSuchWorker",
+ Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@@ -68,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
- Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND,
+ Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
+ StatusCode::NOT_FOUND
+ }
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}
diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs
index 4ad10532..e7ee37af 100644
--- a/src/api/admin/lib.rs
+++ b/src/api/admin/lib.rs
@@ -15,6 +15,7 @@ mod cluster;
mod key;
mod special;
+mod block;
mod worker;
use std::sync::Arc;
diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs
index 6334b3b1..5c6cb29c 100644
--- a/src/api/admin/router_v2.rs
+++ b/src/api/admin/router_v2.rs
@@ -64,6 +64,9 @@ impl AdminApiRequest {
POST GetWorkerInfo (body_field, query::node),
POST GetWorkerVariable (body_field, query::node),
POST SetWorkerVariable (body_field, query::node),
+ // Block APIs
+ GET ListBlockErrors (default::body, query::node),
+ POST GetBlockInfo (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs
index c7c75700..d143e5be 100644
--- a/src/api/admin/worker.rs
+++ b/src/api/admin/worker.rs
@@ -100,7 +100,7 @@ impl RequestHandler for LocalSetWorkerVariableRequest {
fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
WorkerInfoResp {
- id: id,
+ id,
name: info.name,
state: match info.state {
WorkerState::Busy => WorkerStateResp::Busy,
@@ -112,7 +112,7 @@ fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
consecutive_errors: info.consecutive_errors as u64,
last_error: info.last_error.map(|(message, t)| WorkerLastError {
message,
- secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000,
+ secs_ago: now_msec().saturating_sub(t) / 1000,
}),
tranquility: info.status.tranquility,