diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/consul.rs | 110 | ||||
-rw-r--r-- | src/util/config.rs | 15 |
2 files changed, 50 insertions, 75 deletions
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 08fb0418..ab8d1112 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -19,10 +19,15 @@ struct ConsulQueryEntry { address: String, #[serde(rename = "ServicePort")] service_port: u16, - #[serde(rename = "NodeMeta")] - node_meta: HashMap<String, String>, #[serde(rename = "ServiceMeta")] - service_meta: HashMap<String, String>, + meta: HashMap<String, String>, +} + +#[derive(Serialize, Clone, Debug)] +#[serde(untagged)] +enum PublishRequest { + Catalog(ConsulPublishEntry), + Service(ConsulPublishService), } #[derive(Serialize, Clone, Debug)] @@ -31,8 +36,6 @@ struct ConsulPublishEntry { node: String, #[serde(rename = "Address")] address: IpAddr, - #[serde(rename = "NodeMeta")] - node_meta: HashMap<String, String>, #[serde(rename = "Service")] service: ConsulPublishCatalogService, } @@ -46,7 +49,7 @@ struct ConsulPublishCatalogService { #[serde(rename = "Tags")] tags: Vec<String>, #[serde(rename = "Meta")] - service_meta: HashMap<String, String>, + meta: HashMap<String, String>, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] @@ -77,42 +80,36 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { - let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls(); if config.tls_skip_verify { builder = builder.danger_accept_invalid_certs(true); } else if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder.use_rustls_tls(); builder = builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); } - let client: reqwest::Client = match &config.consul_http_api { - ConsulDiscoveryAPI::Catalog => { - match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - builder = builder.use_rustls_tls(); - builder = builder.identity(identity); - } - (None, None) => {} - _ => return Err(ConsulError::InvalidTLSConfig), - } + match &config.api { + ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - builder.build()? - } + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + }, ConsulDiscoveryAPI::Agent => { - if let Some(token) = &config.consul_http_token { + if let Some(token) = &config.token { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( "x-consul-token", @@ -120,11 +117,11 @@ impl ConsulDiscovery { ); builder = builder.default_headers(headers); } - - builder.build()? } }; + let client: reqwest::Client = builder.build()?; + Ok(Self { client, config }) } @@ -142,14 +139,11 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::<IpAddr>().ok(); - let pubkey = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => ent.node_meta.get("pubkey"), - ConsulDiscoveryAPI::Agent => { - ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) - } - } - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = ent + .meta + .get(&format!("{}-pubkey", META_PREFIX)) + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -179,47 +173,34 @@ impl ConsulDiscovery { ] .concat(); - let meta_prefix: String = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => "".to_string(), - ConsulDiscoveryAPI::Agent => format!("{}-", META_PREFIX), - }; - - let mut meta = HashMap::from([ - (format!("{}pubkey", meta_prefix), hex::encode(node_id)), - (format!("{}hostname", meta_prefix), hostname.to_string()), - ]); - - if let Some(global_meta) = &self.config.meta { - for (key, value) in global_meta.into_iter() { - meta.insert(key.clone(), value.clone()); - } - } + let mut meta = self.config.meta.clone().unwrap_or_default(); + meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)); + meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string()); let url = format!( "{}/v1/{}", self.config.consul_http_addr, - (match &self.config.consul_http_api { + (match &self.config.api { ConsulDiscoveryAPI::Catalog => "catalog/register", ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", }) ); let req = self.client.put(&url); - let http = (match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => req.json(&ConsulPublishEntry { + let advertisement: PublishRequest = match &self.config.api { + ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), - node_meta: meta.clone(), service: ConsulPublishCatalogService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, - service_meta: meta.clone(), + meta: meta.clone(), address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }, }), - ConsulDiscoveryAPI::Agent => req.json(&ConsulPublishService { + ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, @@ -227,9 +208,8 @@ impl ConsulDiscovery { address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }), - }) - .send() - .await?; + }; + let http = req.json(&advertisement).send().await?; http.error_for_status()?; Ok(()) diff --git a/src/util/config.rs b/src/util/config.rs index 8b723e47..647c2659 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -135,23 +135,18 @@ pub struct AdminConfig { pub trace_sink: Option<String>, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(rename_all = "lowercase")] pub enum ConsulDiscoveryAPI { - #[serde(rename_all = "lowercase")] + #[default] Catalog, Agent, } -impl ConsulDiscoveryAPI { - fn default() -> Self { - ConsulDiscoveryAPI::Catalog - } -} #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { /// The consul api to use when registering: either `catalog` (the default) or `agent` - #[serde(default = "ConsulDiscoveryAPI::default")] - pub consul_http_api: ConsulDiscoveryAPI, + pub api: ConsulDiscoveryAPI, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, /// Consul service name to use @@ -163,7 +158,7 @@ pub struct ConsulDiscoveryConfig { /// Client TLS key to use when connecting to Consul pub client_key: Option<String>, /// /// Token to use for connecting to consul - pub consul_http_token: Option<String>, + pub token: Option<String>, /// Skip TLS hostname verification #[serde(default)] pub tls_skip_verify: bool, |