diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/admin/api.rs | 71 | ||||
-rw-r--r-- | src/api/admin/block.rs | 149 | ||||
-rw-r--r-- | src/api/admin/error.rs | 9 | ||||
-rw-r--r-- | src/api/admin/lib.rs | 1 | ||||
-rw-r--r-- | src/api/admin/router_v2.rs | 3 | ||||
-rw-r--r-- | src/api/admin/worker.rs | 4 |
6 files changed, 234 insertions, 3 deletions
diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index cf136d28..42872ad0 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -82,6 +82,10 @@ admin_endpoints![ GetWorkerInfo, GetWorkerVariable, SetWorkerVariable, + + // Block operations + ListBlockErrors, + GetBlockInfo, ]; local_admin_endpoints![ @@ -90,6 +94,9 @@ local_admin_endpoints![ GetWorkerInfo, GetWorkerVariable, SetWorkerVariable, + // Block operations + ListBlockErrors, + GetBlockInfo, ]; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -619,6 +626,7 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); // ---- GetWorkerList ---- #[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] pub struct LocalListWorkersRequest { #[serde(default)] pub busy_only: bool, @@ -694,3 +702,66 @@ pub struct LocalSetWorkerVariableResponse { pub variable: String, pub value: String, } + +// ********************************************** +// Block operations +// ********************************************** + +// ---- ListBlockErrors ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListBlockErrorsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>); + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlockError { + pub block_hash: String, + pub refcount: u64, + pub error_count: u64, + pub last_try_secs_ago: u64, + pub next_try_in_secs: u64, +} + +// ---- GetBlockInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoRequest { + pub block_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoResponse { + pub block_hash: String, + pub refcount: u64, + pub versions: Vec<BlockVersion>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockVersion { + pub version_id: String, + pub deleted: bool, + pub garbage_collected: bool, + pub backlink: Option<BlockVersionBacklink>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockVersionBacklink { + Object { + bucket_id: String, + key: String, + }, + Upload { + upload_id: String, + upload_deleted: bool, + upload_garbage_collected: bool, + bucket_id: Option<String>, + key: Option<String>, + }, +} diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs new file mode 100644 index 00000000..157db5b5 --- /dev/null +++ b/src/api/admin/block.rs @@ -0,0 +1,149 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::now_msec; + +use garage_table::EmptyKey; + +use garage_model::garage::Garage; +use garage_model::s3::version_table::*; + +use crate::admin::api::*; +use crate::admin::error::*; +use crate::admin::{Admin, RequestHandler}; +use crate::common_error::CommonErrorDerivative; + +#[async_trait] +impl RequestHandler for LocalListBlockErrorsRequest { + type Response = LocalListBlockErrorsResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalListBlockErrorsResponse, Error> { + let errors = garage.block_manager.list_resync_errors()?; + let now = now_msec(); + let errors = errors + .into_iter() + .map(|e| BlockError { + block_hash: hex::encode(&e.hash), + refcount: e.refcount, + error_count: e.error_count, + last_try_secs_ago: now.saturating_sub(e.last_try) / 1000, + next_try_in_secs: e.next_try.saturating_sub(now) / 1000, + }) + .collect(); + Ok(LocalListBlockErrorsResponse(errors)) + } +} + +#[async_trait] +impl RequestHandler for LocalGetBlockInfoRequest { + type Response = LocalGetBlockInfoResponse; + + async fn handle( + self, + garage: &Arc<Garage>, + _admin: &Admin, + ) -> Result<LocalGetBlockInfoResponse, Error> { + let hash = find_block_hash_by_prefix(garage, &self.block_hash)?; + let refcount = garage.block_manager.get_block_rc(&hash)?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? { + let bl = match &v.backlink { + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: u.deleted.get(), + upload_garbage_collected: false, + bucket_id: Some(hex::encode(&u.bucket_id)), + key: Some(u.key.to_string()), + } + } else { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: true, + upload_garbage_collected: true, + bucket_id: None, + key: None, + } + } + } + VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object { + bucket_id: hex::encode(&bucket_id), + key: key.to_string(), + }, + }; + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: v.deleted.get(), + garbage_collected: false, + backlink: Some(bl), + }); + } else { + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: true, + garbage_collected: true, + backlink: None, + }); + } + } + Ok(LocalGetBlockInfoResponse { + block_hash: hex::encode(&hash), + refcount, + versions, + }) + } +} + +fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> { + if prefix.len() < 4 { + return Err(Error::bad_request( + "Please specify at least 4 characters of the block hash", + )); + } + + let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::bad_request(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string())) +} diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 354a3bab..d7ea7dc9 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,10 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested block does not exist + #[error(display = "Block not found: {}", _0)] + NoSuchBlock(String), + /// The requested worker does not exist #[error(display = "Worker not found: {}", _0)] NoSuchWorker(u64), @@ -58,6 +62,7 @@ impl Error { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", Error::NoSuchWorker(_) => "NoSuchWorker", + Error::NoSuchBlock(_) => "NoSuchBlock", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -68,7 +73,9 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => { + StatusCode::NOT_FOUND + } Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index 4ad10532..e7ee37af 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -15,6 +15,7 @@ mod cluster; mod key; mod special; +mod block; mod worker; use std::sync::Arc; diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index 6334b3b1..5c6cb29c 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -64,6 +64,9 @@ impl AdminApiRequest { POST GetWorkerInfo (body_field, query::node), POST GetWorkerVariable (body_field, query::node), POST SetWorkerVariable (body_field, query::node), + // Block APIs + GET ListBlockErrors (default::body, query::node), + POST GetBlockInfo (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs index c7c75700..d143e5be 100644 --- a/src/api/admin/worker.rs +++ b/src/api/admin/worker.rs @@ -100,7 +100,7 @@ impl RequestHandler for LocalSetWorkerVariableRequest { fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { WorkerInfoResp { - id: id, + id, name: info.name, state: match info.state { WorkerState::Busy => WorkerStateResp::Busy, @@ -112,7 +112,7 @@ fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { consecutive_errors: info.consecutive_errors as u64, last_error: info.last_error.map(|(message, t)| WorkerLastError { message, - secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000, + secs_ago: now_msec().saturating_sub(t) / 1000, }), tranquility: info.status.tranquility, |