diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-13 12:55:41 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-15 12:15:07 +0100 |
commit | 5ea24254a91c3794ceb69e68c940b13f5447f40c (patch) | |
tree | dc30460edeb757699fc7057a7fed640c69dfb79d /src/net/util.rs | |
parent | a2ab275da80159ea2f0606d129790d79d43b4e24 (diff) | |
download | garage-5ea24254a91c3794ceb69e68c940b13f5447f40c.tar.gz garage-5ea24254a91c3794ceb69e68c940b13f5447f40c.zip |
[import-netapp] import Netapp code into Garage codebase
Diffstat (limited to 'src/net/util.rs')
-rw-r--r-- | src/net/util.rs | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/src/net/util.rs b/src/net/util.rs new file mode 100644 index 00000000..56230b73 --- /dev/null +++ b/src/net/util.rs @@ -0,0 +1,96 @@ +use std::net::SocketAddr; + +use log::info; +use serde::Serialize; + +use tokio::sync::watch; + +use crate::netapp::*; + +/// Utility function: encodes any serializable value in MessagePack binary format +/// using the RMP library. +/// +/// Field names and variant names are included in the serialization. +/// This is used internally by the netapp communication protocol. +pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map(); + val.serialize(&mut se)?; + Ok(wr) +} + +/// This async function returns only when a true signal was received +/// from a watcher that tells us when to exit. +/// +/// Usefull in a select statement to interrupt another +/// future: +/// ```ignore +/// select!( +/// _ = a_long_task() => Success, +/// _ = await_exit(must_exit) => Interrupted, +/// ) +/// ``` +pub async fn await_exit(mut must_exit: watch::Receiver<bool>) { + while !*must_exit.borrow_and_update() { + if must_exit.changed().await.is_err() { + break; + } + } +} + +/// Creates a watch that contains `false`, and that changes +/// to `true` when a Ctrl+C signal is received. +pub fn watch_ctrl_c() -> watch::Receiver<bool> { + let (send_cancel, watch_cancel) = watch::channel(false); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.send(true).unwrap(); + }); + watch_cancel +} + +/// Parse a peer's address including public key, written in the format: +/// `<public key hex>@<ip>:<port>` +pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> { + let delim = peer.find('@')?; + let (key, ip) = peer.split_at(delim); + let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; + let ip = ip[1..].parse::<SocketAddr>().ok()?; + Some((pubkey, ip)) +} + +/// Parse and resolve a peer's address including public key, written in the format: +/// `<public key hex>@<ip or hostname>:<port>` +pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> { + use std::net::ToSocketAddrs; + + let delim = peer.find('@')?; + let (key, host) = peer.split_at(delim); + let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; + let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>(); + if hosts.is_empty() { + return None; + } + Some((pubkey, hosts)) +} + +/// async version of parse_and_resolve_peer_addr +pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> { + let delim = peer.find('@')?; + let (key, host) = peer.split_at(delim); + let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; + let hosts = tokio::net::lookup_host(&host[1..]) + .await + .ok()? + .collect::<Vec<_>>(); + if hosts.is_empty() { + return None; + } + Some((pubkey, hosts)) +} |