aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-05-05 13:40:31 +0200
committerAlex Auvolat <alex@adnab.me>2022-05-10 13:25:10 +0200
commit7a19daafbd08b1372aa3c41eab9b3871c26bca80 (patch)
tree76d49a330c703658055373482660935019631922 /src
parent99fcfa3844a346463e5739fec19ac2a6b560adfc (diff)
downloadgarage-7a19daafbd08b1372aa3c41eab9b3871c26bca80.tar.gz
garage-7a19daafbd08b1372aa3c41eab9b3871c26bca80.zip
Implement /status Admin endpoint
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/api_server.rs14
-rw-r--r--src/api/admin/cluster.rs69
-rw-r--r--src/api/admin/mod.rs2
-rw-r--r--src/api/admin/router.rs2
-rw-r--r--src/rpc/system.rs114
5 files changed, 152 insertions, 49 deletions
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 836b5158..57842548 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::future::Future;
-use http::header::CONTENT_TYPE;
+use http::header::{CONTENT_TYPE, ALLOW, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN};
use hyper::{Body, Request, Response};
use opentelemetry::trace::{SpanRef, Tracer};
@@ -17,6 +17,7 @@ use crate::error::*;
use crate::generic_server::*;
use crate::admin::router::{Authorization, Endpoint};
+use crate::admin::cluster::*;
pub struct AdminApiServer {
garage: Arc<Garage>,
@@ -56,6 +57,15 @@ impl AdminApiServer {
}
}
+ fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> {
+ Ok(Response::builder()
+ .status(204)
+ .header(ALLOW, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST")
+ .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
+ .body(Body::empty())?)
+ }
+
fn handle_metrics(&self) -> Result<Response<Body>, Error> {
let mut buffer = vec![];
let encoder = TextEncoder::new();
@@ -110,7 +120,9 @@ impl ApiHandler for AdminApiServer {
}
match endpoint {
+ Endpoint::Options => self.handle_options(&req),
Endpoint::Metrics => self.handle_metrics(),
+ Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
_ => Err(Error::NotImplemented(format!(
"Admin endpoint {} not implemented yet",
endpoint.name()
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
new file mode 100644
index 00000000..9ed41944
--- /dev/null
+++ b/src/api/admin/cluster.rs
@@ -0,0 +1,69 @@
+use std::sync::Arc;
+use std::collections::HashMap;
+use std::net::{IpAddr, SocketAddr};
+
+use serde::{Serialize};
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+use garage_rpc::layout::*;
+use garage_rpc::system::*;
+
+use garage_model::garage::Garage;
+
+
+use crate::error::*;
+
+
+pub async fn handle_get_cluster_status(
+ garage: &Arc<Garage>,
+) -> Result<Response<Body>, Error> {
+ let layout = garage.system.get_cluster_layout();
+
+ let res = GetClusterStatusResponse {
+ known_nodes: garage.system.get_known_nodes()
+ .into_iter()
+ .map(|i| (hex::encode(i.id), KnownNodeResp {
+ addr: i.addr,
+ is_up: i.is_up,
+ last_seen_secs_ago: i.last_seen_secs_ago,
+ hostname: i.status.hostname,
+ }))
+ .collect(),
+ roles: layout.roles.items()
+ .iter()
+ .filter(|(_, _, v)| v.0.is_some())
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ staged_role_changes: layout.staging.items()
+ .iter()
+ .filter(|(k, _, v)| layout.roles.get(k) != Some(v))
+ .map(|(k, _, v)| (hex::encode(k), v.0.clone()))
+ .collect(),
+ };
+
+ let resp_json = serde_json::to_string_pretty(&res).map_err(GarageError::from)?;
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(resp_json))?)
+}
+
+
+#[derive(Serialize)]
+struct GetClusterStatusResponse {
+ #[serde(rename = "knownNodes")]
+ known_nodes: HashMap<String, KnownNodeResp>,
+ roles: HashMap<String, Option<NodeRole>>,
+ #[serde(rename = "stagedRoleChanges")]
+ staged_role_changes: HashMap<String, Option<NodeRole>>,
+}
+
+#[derive(Serialize)]
+struct KnownNodeResp {
+ addr: SocketAddr,
+ is_up: bool,
+ last_seen_secs_ago: Option<u64>,
+ hostname: String,
+}
diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs
index ff2cf4b1..7e8d0635 100644
--- a/src/api/admin/mod.rs
+++ b/src/api/admin/mod.rs
@@ -1,2 +1,4 @@
pub mod api_server;
mod router;
+
+mod cluster;
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index d0b30fc1..714af1e8 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -14,8 +14,8 @@ router_match! {@func
/// List of all Admin API endpoints.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
- Metrics,
Options,
+ Metrics,
GetClusterStatus,
GetClusterLayout,
UpdateClusterLayout,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 68d94ea5..a5b8d4f4 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -312,6 +312,67 @@ impl System {
);
}
+ // ---- Administrative operations (directly available and
+ // also available through RPC) ----
+
+ pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
+ let node_status = self.node_status.read().unwrap();
+ let known_nodes = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| KnownNodeInfo {
+ id: n.id.into(),
+ addr: n.addr,
+ is_up: n.is_up(),
+ last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
+ status: node_status
+ .get(&n.id.into())
+ .cloned()
+ .map(|(_, st)| st)
+ .unwrap_or(NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ }),
+ })
+ .collect::<Vec<_>>();
+ known_nodes
+ }
+
+ pub fn get_cluster_layout(&self) -> ClusterLayout {
+ self.ring.borrow().layout.clone()
+ }
+
+ pub async fn connect(&self, node: &str) -> Result<(), Error> {
+ let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
+ Error::Message(format!(
+ "Unable to parse or resolve node specification: {}",
+ node
+ ))
+ })?;
+ let mut errors = vec![];
+ for ip in addrs.iter() {
+ match self
+ .netapp
+ .clone()
+ .try_connect(*ip, pubkey)
+ .await
+ .err_context(CONNECT_ERROR_MESSAGE)
+ {
+ Ok(()) => return Ok(()),
+ Err(e) => {
+ errors.push((*ip, e));
+ }
+ }
+ }
+ return Err(Error::Message(format!(
+ "Could not connect to specified peers. Errors: {:?}",
+ errors
+ )));
+ }
+
// ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
@@ -384,32 +445,11 @@ impl System {
self.local_status.swap(Arc::new(new_si));
}
+ // --- RPC HANDLERS ---
+
async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> {
- let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| {
- Error::Message(format!(
- "Unable to parse or resolve node specification: {}",
- node
- ))
- })?;
- let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
- Ok(()) => return Ok(SystemRpc::Ok),
- Err(e) => {
- errors.push((*ip, e));
- }
- }
- }
- return Err(Error::Message(format!(
- "Could not connect to specified peers. Errors: {:?}",
- errors
- )));
+ self.connect(node).await?;
+ Ok(SystemRpc::Ok)
}
fn handle_pull_cluster_layout(&self) -> SystemRpc {
@@ -418,31 +458,11 @@ impl System {
}
fn handle_get_known_nodes(&self) -> SystemRpc {
- let node_status = self.node_status.read().unwrap();
- let known_nodes = self
- .fullmesh
- .get_peer_list()
- .iter()
- .map(|n| KnownNodeInfo {
- id: n.id.into(),
- addr: n.addr,
- is_up: n.is_up(),
- last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()),
- status: node_status
- .get(&n.id.into())
- .cloned()
- .map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
- })
- .collect::<Vec<_>>();
+ let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
}
+
async fn handle_advertise_status(
self: &Arc<Self>,
from: Uuid,