diff options
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 2 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 4 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 176 | ||||
-rw-r--r-- | src/api/common_error.rs | 8 | ||||
-rw-r--r-- | src/api/k2v/index.rs | 11 | ||||
-rw-r--r-- | src/api/s3/put.rs | 2 |
7 files changed, 151 insertions, 54 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 011a64d8..9fb562a3 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.9.1" +version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 50813d11..2b9be24e 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -284,7 +284,7 @@ impl ApiHandler for AdminApiServer { Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, - Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await, // Keys Endpoint::ListKeys => handle_list_keys(&self.garage).await, Endpoint::GetKeyInfo { diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 1b22dd03..a8718a9f 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -123,7 +123,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mpu_counters = garage @@ -131,7 +131,7 @@ async fn bucket_info_results( .table .get(&bucket_id, &EmptyKey) .await? - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let mut relevant_keys = HashMap::new(); diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 3876c608..8ce6c5ed 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -16,25 +17,95 @@ use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { + let layout = garage.system.cluster_layout(); + let mut nodes = garage + .system + .get_known_nodes() + .into_iter() + .map(|i| { + ( + i.id, + NodeResp { + id: hex::encode(i.id), + addr: Some(i.addr), + hostname: i.status.hostname, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + data_partition: i + .status + .data_disk_avail + .map(|(avail, total)| FreeSpaceResp { + available: avail, + total, + }), + metadata_partition: i.status.meta_disk_avail.map(|(avail, total)| { + FreeSpaceResp { + available: avail, + total, + } + }), + ..Default::default() + }, + ) + }) + .collect::<HashMap<_, _>>(); + + for (id, _, role) in layout.current().roles.items().iter() { + if let layout::NodeRoleV(Some(r)) = role { + let role = NodeRoleResp { + id: hex::encode(id), + zone: r.zone.to_string(), + capacity: r.capacity, + tags: r.tags.clone(), + }; + match nodes.get_mut(id) { + None => { + nodes.insert( + *id, + NodeResp { + id: hex::encode(id), + role: Some(role), + ..Default::default() + }, + ); + } + Some(n) => { + if n.role.is_none() { + n.role = Some(role); + } + } + } + } + } + + for ver in layout.versions.iter().rev().skip(1) { + for (id, _, role) in ver.roles.items().iter() { + if let layout::NodeRoleV(Some(r)) = role { + if !nodes.contains_key(id) && r.capacity.is_some() { + nodes.insert( + *id, + NodeResp { + id: hex::encode(id), + draining: true, + ..Default::default() + }, + ); + } + } + } + } + + let mut nodes = nodes.into_values().collect::<Vec<_>>(); + nodes.sort_by(|x, y| x.id.cmp(&y.id)); + let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), garage_features: garage_util::version::garage_features(), rust_version: garage_util::version::rust_version(), db_engine: garage.db.engine(), - known_nodes: garage - .system - .get_known_nodes() - .into_iter() - .map(|i| KnownNodeResp { - id: hex::encode(i.id), - addr: i.addr, - is_up: i.is_up, - last_seen_secs_ago: i.last_seen_secs_ago, - hostname: i.status.hostname, - }) - .collect(), - layout: format_cluster_layout(&garage.system.get_cluster_layout()), + layout_version: layout.current().version, + nodes, }; Ok(json_ok_response(&res)?) @@ -85,13 +156,14 @@ pub async fn handle_connect_cluster_nodes( } pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { - let res = format_cluster_layout(&garage.system.get_cluster_layout()); + let res = format_cluster_layout(&garage.system.cluster_layout()); Ok(json_ok_response(&res)?) } -fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse { +fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse { let roles = layout + .current() .roles .items() .iter() @@ -105,10 +177,12 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .collect::<Vec<_>>(); let staged_role_changes = layout - .staging_roles + .staging + .get() + .roles .items() .iter() - .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v)) .map(|(k, _, v)| match &v.0 { None => NodeRoleChange { id: hex::encode(k), @@ -126,7 +200,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .collect::<Vec<_>>(); GetClusterLayoutResponse { - version: layout.version, + version: layout.current().version, roles, staged_role_changes, } @@ -155,8 +229,8 @@ struct GetClusterStatusResponse { garage_features: Option<&'static [&'static str]>, rust_version: &'static str, db_engine: String, - known_nodes: Vec<KnownNodeResp>, - layout: GetClusterLayoutResponse, + layout_version: u64, + nodes: Vec<NodeResp>, } #[derive(Serialize)] @@ -190,14 +264,27 @@ struct NodeRoleResp { tags: Vec<String>, } -#[derive(Serialize)] +#[derive(Serialize, Default)] +#[serde(rename_all = "camelCase")] +struct FreeSpaceResp { + available: u64, + total: u64, +} + +#[derive(Serialize, Default)] #[serde(rename_all = "camelCase")] -struct KnownNodeResp { +struct NodeResp { id: String, - addr: SocketAddr, + role: Option<NodeRoleResp>, + addr: Option<SocketAddr>, + hostname: Option<String>, is_up: bool, last_seen_secs_ago: Option<u64>, - hostname: String, + draining: bool, + #[serde(skip_serializing_if = "Option::is_none")] + data_partition: Option<FreeSpaceResp>, + #[serde(skip_serializing_if = "Option::is_none")] + metadata_partition: Option<FreeSpaceResp>, } // ---- update functions ---- @@ -208,10 +295,10 @@ pub async fn handle_update_cluster_layout( ) -> Result<Response<ResBody>, Error> { let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?; - let mut layout = garage.system.get_cluster_layout(); + let mut layout = garage.system.cluster_layout().clone(); - let mut roles = layout.roles.clone(); - roles.merge(&layout.staging_roles); + let mut roles = layout.current().roles.clone(); + roles.merge(&layout.staging.get().roles); for change in updates { let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; @@ -232,11 +319,17 @@ pub async fn handle_update_cluster_layout( }; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) @@ -246,12 +339,16 @@ pub async fn handle_apply_cluster_layout( garage: &Arc<Garage>, req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; + let param = parse_json_body::<ApplyLayoutRequest, _, Error>(req).await?; - let layout = garage.system.get_cluster_layout(); + let layout = garage.system.cluster_layout().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = ApplyClusterLayoutResponse { message: msg, @@ -262,13 +359,14 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout( garage: &Arc<Garage>, - req: Request<IncomingBody>, ) -> Result<Response<ResBody>, Error> { - let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?; - - let layout = garage.system.get_cluster_layout(); - let layout = layout.revert_staged_changes(Some(param.version))?; - garage.system.update_cluster_layout(&layout).await?; + let layout = garage.system.cluster_layout().clone(); + let layout = layout.revert_staged_changes()?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) @@ -280,7 +378,7 @@ type UpdateClusterLayoutRequest = Vec<NodeRoleChange>; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct ApplyRevertLayoutRequest { +struct ApplyLayoutRequest { version: u64, } diff --git a/src/api/common_error.rs b/src/api/common_error.rs index 4381f227..c47555d4 100644 --- a/src/api/common_error.rs +++ b/src/api/common_error.rs @@ -59,9 +59,7 @@ impl CommonError { pub fn http_status_code(&self) -> StatusCode { match self { CommonError::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..), ) => StatusCode::SERVICE_UNAVAILABLE, CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { StatusCode::INTERNAL_SERVER_ERROR @@ -80,9 +78,7 @@ impl CommonError { match self { CommonError::Forbidden(_) => "AccessDenied", CommonError::InternalError( - GarageError::Timeout - | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::Quorum(..), ) => "ServiceUnavailable", CommonError::InternalError(_) | CommonError::Hyper(_) | CommonError::Http(_) => { "InternalError" diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 1baec1db..291464ab 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,7 +5,6 @@ use serde::Serialize; use garage_util::data::*; -use garage_rpc::ring::Ring; use garage_table::util::*; use garage_model::garage::Garage; @@ -27,7 +26,11 @@ pub async fn handle_read_index( ) -> Result<Response<ResBody>, Error> { let reverse = reverse.unwrap_or(false); - let ring: Arc<Ring> = garage.system.ring.borrow().clone(); + let node_id_vec = garage + .system + .cluster_layout() + .all_nongateway_nodes() + .to_vec(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, @@ -36,7 +39,7 @@ pub async fn handle_read_index( &start, &end, limit, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, node_id_vec)), EnumerationOrder::from_reverse(reverse), ) .await?; @@ -55,7 +58,7 @@ pub async fn handle_read_index( partition_keys: partition_keys .into_iter() .map(|part| { - let vals = part.filtered_values(&ring); + let vals = part.filtered_values(&garage.system.cluster_layout()); ReadIndexResponseEntry { pk: part.sk, entries: *vals.get(&s_entries).unwrap_or(&0), diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 5ac5fb6b..8902b14c 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -255,7 +255,7 @@ pub(crate) async fn check_quotas( .await?; let counters = counters - .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .map(|x| x.filtered_values(&garage.system.cluster_layout())) .unwrap_or_default(); let (prev_cnt_obj, prev_cnt_size) = match prev_object { |