diff options
author | Alex Auvolat <alex@adnab.me> | 2022-05-05 13:40:31 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-05-10 13:25:10 +0200 |
commit | 7a19daafbd08b1372aa3c41eab9b3871c26bca80 (patch) | |
tree | 76d49a330c703658055373482660935019631922 | |
parent | 99fcfa3844a346463e5739fec19ac2a6b560adfc (diff) | |
download | garage-7a19daafbd08b1372aa3c41eab9b3871c26bca80.tar.gz garage-7a19daafbd08b1372aa3c41eab9b3871c26bca80.zip |
Implement /status Admin endpoint
-rw-r--r-- | src/api/admin/api_server.rs | 14 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 69 | ||||
-rw-r--r-- | src/api/admin/mod.rs | 2 | ||||
-rw-r--r-- | src/api/admin/router.rs | 2 | ||||
-rw-r--r-- | src/rpc/system.rs | 114 |
5 files changed, 152 insertions, 49 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 836b5158..57842548 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::future::Future; -use http::header::CONTENT_TYPE; +use http::header::{CONTENT_TYPE, ALLOW, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN}; use hyper::{Body, Request, Response}; use opentelemetry::trace::{SpanRef, Tracer}; @@ -17,6 +17,7 @@ use crate::error::*; use crate::generic_server::*; use crate::admin::router::{Authorization, Endpoint}; +use crate::admin::cluster::*; pub struct AdminApiServer { garage: Arc<Garage>, @@ -56,6 +57,15 @@ impl AdminApiServer { } } + fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { + Ok(Response::builder() + .status(204) + .header(ALLOW, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") + .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") + .body(Body::empty())?) + } + fn handle_metrics(&self) -> Result<Response<Body>, Error> { let mut buffer = vec![]; let encoder = TextEncoder::new(); @@ -110,7 +120,9 @@ impl ApiHandler for AdminApiServer { } match endpoint { + Endpoint::Options => self.handle_options(&req), Endpoint::Metrics => self.handle_metrics(), + Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, _ => Err(Error::NotImplemented(format!( "Admin endpoint {} not implemented yet", endpoint.name() diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs new file mode 100644 index 00000000..9ed41944 --- /dev/null +++ b/src/api/admin/cluster.rs @@ -0,0 +1,69 @@ +use std::sync::Arc; +use std::collections::HashMap; +use std::net::{IpAddr, SocketAddr}; + +use serde::{Serialize}; + +use hyper::{Body, Request, Response, StatusCode}; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_rpc::layout::*; +use garage_rpc::system::*; + +use garage_model::garage::Garage; + + +use crate::error::*; + + +pub async fn handle_get_cluster_status( + garage: &Arc<Garage>, +) -> Result<Response<Body>, Error> { + let layout = garage.system.get_cluster_layout(); + + let res = GetClusterStatusResponse { + known_nodes: garage.system.get_known_nodes() + .into_iter() + .map(|i| (hex::encode(i.id), KnownNodeResp { + addr: i.addr, + is_up: i.is_up, + last_seen_secs_ago: i.last_seen_secs_ago, + hostname: i.status.hostname, + })) + .collect(), + roles: layout.roles.items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + staged_role_changes: layout.staging.items() + .iter() + .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .map(|(k, _, v)| (hex::encode(k), v.0.clone())) + .collect(), + }; + + let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(resp_json))?) +} + + +#[derive(Serialize)] +struct GetClusterStatusResponse { + #[serde(rename = "knownNodes")] + known_nodes: HashMap<String, KnownNodeResp>, + roles: HashMap<String, Option<NodeRole>>, + #[serde(rename = "stagedRoleChanges")] + staged_role_changes: HashMap<String, Option<NodeRole>>, +} + +#[derive(Serialize)] +struct KnownNodeResp { + addr: SocketAddr, + is_up: bool, + last_seen_secs_ago: Option<u64>, + hostname: String, +} diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs index ff2cf4b1..7e8d0635 100644 --- a/src/api/admin/mod.rs +++ b/src/api/admin/mod.rs @@ -1,2 +1,4 @@ pub mod api_server; mod router; + +mod cluster; diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs index d0b30fc1..714af1e8 100644 --- a/src/api/admin/router.rs +++ b/src/api/admin/router.rs @@ -14,8 +14,8 @@ router_match! {@func /// List of all Admin API endpoints. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Endpoint { - Metrics, Options, + Metrics, GetClusterStatus, GetClusterLayout, UpdateClusterLayout, diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68d94ea5..a5b8d4f4 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -312,6 +312,67 @@ impl System { ); } + // ---- Administrative operations (directly available and + // also available through RPC) ---- + + pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { + let node_status = self.node_status.read().unwrap(); + let known_nodes = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), + status: node_status + .get(&n.id.into()) + .cloned() + .map(|(_, st)| st) + .unwrap_or(NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + }), + }) + .collect::<Vec<_>>(); + known_nodes + } + + pub fn get_cluster_layout(&self) -> ClusterLayout { + self.ring.borrow().layout.clone() + } + + pub async fn connect(&self, node: &str) -> Result<(), Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { + Ok(()) => return Ok(()), + Err(e) => { + errors.push((*ip, e)); + } + } + } + return Err(Error::Message(format!( + "Could not connect to specified peers. Errors: {:?}", + errors + ))); + } + // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { @@ -384,32 +445,11 @@ impl System { self.local_status.swap(Arc::new(new_si)); } + // --- RPC HANDLERS --- + async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; - let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { - Ok(()) => return Ok(SystemRpc::Ok), - Err(e) => { - errors.push((*ip, e)); - } - } - } - return Err(Error::Message(format!( - "Could not connect to specified peers. Errors: {:?}", - errors - ))); + self.connect(node).await?; + Ok(SystemRpc::Ok) } fn handle_pull_cluster_layout(&self) -> SystemRpc { @@ -418,31 +458,11 @@ impl System { } fn handle_get_known_nodes(&self) -> SystemRpc { - let node_status = self.node_status.read().unwrap(); - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| KnownNodeInfo { - id: n.id.into(), - addr: n.addr, - is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), - status: node_status - .get(&n.id.into()) - .cloned() - .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), - }) - .collect::<Vec<_>>(); + let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) } + async fn handle_advertise_status( self: &Arc<Self>, from: Uuid, |