diff options
Diffstat (limited to 'src/rpc/consul.rs')
-rw-r--r-- | src/rpc/consul.rs | 193 |
1 files changed, 136 insertions, 57 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index f85f789c..cd29893f 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::config::ConsulDiscoveryConfig; +use garage_util::config::ConsulDiscoveryMode; + +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { @@ -18,6 +21,8 @@ struct ConsulQueryEntry { service_port: u16, #[serde(rename = "NodeMeta")] node_meta: HashMap<String, String>, + #[serde(rename = "ServiceMeta")] + service_meta: HashMap<String, String>, } #[derive(Serialize, Clone, Debug)] @@ -29,25 +34,42 @@ struct ConsulPublishEntry { #[serde(rename = "NodeMeta")] node_meta: HashMap<String, String>, #[serde(rename = "Service")] - service: ConsulPublishService, + service: ConsulPublishCatalogService, } #[derive(Serialize, Clone, Debug)] -struct ConsulPublishService { +struct ConsulPublishCatalogService { #[serde(rename = "ID")] service_id: String, #[serde(rename = "Service")] service_name: String, #[serde(rename = "Tags")] tags: Vec<String>, + #[serde(rename = "Meta")] + service_meta: HashMap<String, String>, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] port: u16, } -// ---- +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Name")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec<String>, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, + #[serde(rename = "Meta")] + meta: HashMap<String, String>, +} +// ---- pub struct ConsulDiscovery { config: ConsulDiscoveryConfig, client: reqwest::Client, @@ -55,42 +77,60 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + builder = builder.danger_accept_invalid_certs(config.tls_skip_verify); + + let client: reqwest::Client = match &config.mode { + ConsulDiscoveryMode::Node => { + if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder.use_rustls_tls(); + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + } + + match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.use_rustls_tls(); + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + } + + builder.build()? + } + ConsulDiscoveryMode::Service => { + if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + builder = builder.use_rustls_tls(); + } - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? + if let Some(token) = &config.consul_http_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); + builder = builder.default_headers(headers); } + + builder.build()? } - (None, None) => reqwest::Client::new(), - _ => return Err(ConsulError::InvalidTLSConfig), }; Ok(Self { client, config }) @@ -110,11 +150,14 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::<IpAddr>().ok(); - let pubkey = ent - .node_meta - .get("pubkey") - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = match &self.config.mode { + ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"), + ConsulDiscoveryMode::Service => { + ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) + } + } + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -138,29 +181,63 @@ impl ConsulDiscovery { rpc_public_addr: SocketAddr, ) -> Result<(), ConsulError> { let node = format!("garage:{}", hex::encode(&node_id[..8])); + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); + + let meta_prefix: String = match &self.config.mode { + ConsulDiscoveryMode::Node => "".to_string(), + ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX), + }; - let advertisement = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { + let mut meta = HashMap::from([ + (format!("{}pubkey", meta_prefix), hex::encode(node_id)), + (format!("{}hostname", meta_prefix), hostname.to_string()), + ]); + + if let Some(global_meta) = &self.config.meta { + for (key, value) in global_meta.into_iter() { + meta.insert(key.clone(), value.clone()); + } + } + + let url = format!( + "{}/v1/{}", + self.config.consul_http_addr, + (match &self.config.mode { + ConsulDiscoveryMode::Node => "catalog/register", + ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks", + }) + ); + + let req = self.client.put(&url); + let http = (match &self.config.mode { + ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: meta.clone(), + service: ConsulPublishCatalogService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + service_meta: meta.clone(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }), + ConsulDiscoveryMode::Service => req.json(&ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), - tags: vec!["advertised-by-garage".into(), hostname.into()], + tags, + meta, address: rpc_public_addr.ip(), port: rpc_public_addr.port(), - }, - }; - - let url = format!("{}/v1/catalog/register", self.config.consul_http_addr); - - let http = self.client.put(&url).json(&advertisement).send().await?; + }), + }) + .send() + .await?; http.error_for_status()?; Ok(()) @@ -176,4 +253,6 @@ pub enum ConsulError { Reqwest(#[error(source)] reqwest::Error), #[error(display = "Invalid Consul TLS configuration")] InvalidTLSConfig, + #[error(display = "Token error: {}", _0)] + Token(#[error(source)] reqwest::header::InvalidHeaderValue), } |