diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 4 | ||||
-rw-r--r-- | src/rpc/consul.rs | 125 | ||||
-rw-r--r-- | src/rpc/ring.rs | 36 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 53 | ||||
-rw-r--r-- | src/rpc/system.rs | 277 |
5 files changed, 348 insertions, 147 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index ef03898a..f0ac6570 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,8 +15,6 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.4.0", path = "../util" } -garage_rpc_021 = { package = "garage_rpc", version = "0.2.1" } - arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" @@ -36,5 +34,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -hyper = "0.14" +hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 63051a6b..82bf99ba 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -1,24 +1,31 @@ +use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use hyper::client::Client; use hyper::StatusCode; use hyper::{Body, Method, Request}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; + +use netapp::NodeID; use garage_util::error::Error; -#[derive(Deserialize, Clone)] -struct ConsulEntry { - #[serde(alias = "Address")] +// ---- READING FROM CONSUL CATALOG ---- + +#[derive(Deserialize, Clone, Debug)] +struct ConsulQueryEntry { + #[serde(rename = "Address")] address: String, - #[serde(alias = "ServicePort")] + #[serde(rename = "ServicePort")] service_port: u16, + #[serde(rename = "NodeMeta")] + node_meta: HashMap<String, String>, } pub async fn get_consul_nodes( consul_host: &str, consul_service_name: &str, -) -> Result<Vec<SocketAddr>, Error> { +) -> Result<Vec<(NodeID, SocketAddr)>, Error> { let url = format!( "http://{}/v1/catalog/service/{}", consul_host, consul_service_name @@ -36,17 +43,111 @@ pub async fn get_consul_nodes( } let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?; + let entries = serde_json::from_slice::<Vec<ConsulQueryEntry>>(body.as_ref())?; let mut ret = vec![]; for ent in entries { - let ip = ent - .address - .parse::<IpAddr>() - .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?; - ret.push(SocketAddr::new(ip, ent.service_port)); + let ip = ent.address.parse::<IpAddr>().ok(); + let pubkey = ent + .node_meta + .get("pubkey") + .map(|k| hex::decode(&k).ok()) + .flatten() + .map(|k| NodeID::from_slice(&k[..])) + .flatten(); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } } debug!("Got nodes from Consul: {:?}", ret); Ok(ret) } + +// ---- PUBLISHING TO CONSUL CATALOG ---- + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishEntry { + #[serde(rename = "Node")] + node: String, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "NodeMeta")] + node_meta: HashMap<String, String>, + #[serde(rename = "Service")] + service: ConsulPublishService, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Service")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec<String>, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, +} + +pub async fn publish_consul_service( + consul_host: &str, + consul_service_name: &str, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, +) -> Result<(), Error> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisment = ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: consul_service_name.to_string(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("http://{}/v1/catalog/register", consul_host); + let req_body = serde_json::to_string(&advertisment)?; + debug!("Request body for consul adv: {}", req_body); + + let req = Request::builder() + .uri(url) + .method(Method::PUT) + .body(Body::from(req_body))?; + + let client = Client::new(); + + let resp = client.request(req).await?; + debug!("Response of advertising to Consul: {:?}", resp); + let resp_code = resp.status(); + debug!( + "{}", + std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?) + .unwrap_or("<invalid utf8>") + ); + + if resp_code != StatusCode::OK { + return Err(Error::Message(format!("HTTP error {}", resp_code))); + } + + Ok(()) +} diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 7cbab762..3cb0d233 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -3,8 +3,6 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use netapp::NodeID; - use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -40,31 +38,6 @@ impl NetworkConfig { version: 0, } } - - pub(crate) fn migrate_from_021(old: garage_rpc_021::ring::NetworkConfig) -> Self { - let members = old - .members - .into_iter() - .map(|(id, conf)| { - ( - Hash::try_from(id.as_slice()).unwrap(), - NetworkConfigEntry { - zone: conf.datacenter, - capacity: if conf.capacity == 0 { - None - } else { - Some(conf.capacity) - }, - tag: conf.tag, - }, - ) - }) - .collect(); - Self { - members, - version: old.version, - } - } } /// The overall configuration of one (possibly remote) node @@ -100,7 +73,7 @@ pub struct Ring { pub config: NetworkConfig, // Internal order of nodes used to make a more compact representation of the ring - nodes: Vec<NodeID>, + nodes: Vec<Uuid>, // The list of entries in the ring ring: Vec<RingEntry>, @@ -262,11 +235,6 @@ impl Ring { }) .collect::<Vec<_>>(); - let nodes = nodes - .iter() - .map(|id| NodeID::from_slice(id.as_slice()).unwrap()) - .collect::<Vec<_>>(); - Self { replication_factor, config, @@ -298,7 +266,7 @@ impl Ring { } /// Walk the ring to find the n servers in which data should be replicated - pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<NodeID> { + pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c9458ee6..9f735ab4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -8,13 +8,14 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; -use garage_util::error::{Error, RpcError}; +use garage_util::error::Error; +use garage_util::data::Uuid; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -66,46 +67,47 @@ pub struct RpcHelper { } impl RpcHelper { - pub async fn call<M, H>( + pub async fn call<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: NodeID, + to: Uuid, msg: M, strat: RequestStrategy, - ) -> Result<M::Response, Error> + ) -> Result<S, Error> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { self.call_arc(endpoint, to, Arc::new(msg), strat).await } - pub async fn call_arc<M, H>( + pub async fn call_arc<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: NodeID, + to: Uuid, msg: Arc<M>, strat: RequestStrategy, - ) -> Result<M::Response, Error> + ) -> Result<S, Error> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { + let node_id = to.into(); select! { - res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?), - _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)), + res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??), + _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout), } } - pub async fn call_many<M, H>( + pub async fn call_many<M, H, S>( &self, endpoint: &Endpoint<M, H>, - to: &[NodeID], + to: &[Uuid], msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result<M::Response, Error>)> + ) -> Vec<(Uuid, Result<S, Error>)> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { let msg = Arc::new(msg); @@ -120,37 +122,38 @@ impl RpcHelper { .collect::<Vec<_>>() } - pub async fn broadcast<M, H>( + pub async fn broadcast<M, H, S>( &self, endpoint: &Endpoint<M, H>, msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result<M::Response, Error>)> + ) -> Vec<(Uuid, Result<S, Error>)> where - M: Message, + M: Rpc<Response = Result<S, Error>>, H: EndpointHandler<M>, { let to = self .fullmesh .get_peer_list() .iter() - .map(|p| p.id) + .map(|p| p.id.into()) .collect::<Vec<_>>(); self.call_many(endpoint, &to[..], msg, strat).await } /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if /// strategy could not be respected due to too many errors - pub async fn try_call_many<M, H>( + pub async fn try_call_many<M, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, - to: &[NodeID], + to: &[Uuid], msg: M, strategy: RequestStrategy, - ) -> Result<Vec<M::Response>, Error> + ) -> Result<Vec<S>, Error> where - M: Message + 'static, + M: Rpc<Response = Result<S, Error>> + 'static, H: EndpointHandler<M> + 'static, + S: Send, { let msg = Arc::new(msg); let mut resp_stream = to @@ -200,7 +203,7 @@ impl RpcHelper { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); - Err(Error::from(RpcError::TooManyErrors(errors))) + Err(Error::TooManyErrors(errors)) } } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 7ccec945..b95cff58 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,8 +1,9 @@ //! Module containing structs related to membership management +use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use arc_swap::ArcSwap; @@ -14,21 +15,24 @@ use sodiumoxide::crypto::sign::ed25519; use tokio::sync::watch; use tokio::sync::Mutex; -use netapp::endpoint::{Endpoint, EndpointHandler, Message}; +use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::proto::*; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; +use netapp::util::parse_and_resolve_peer_addr; use garage_util::background::BackgroundRunner; +use garage_util::data::Uuid; use garage_util::error::Error; use garage_util::persister::Persister; -//use garage_util::time::*; +use garage_util::time::*; -//use crate::consul::get_consul_nodes; +use crate::consul::*; use crate::ring::*; -use crate::rpc_helper::{RequestStrategy, RpcHelper}; +use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); +const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); /// RPC endpoint used for calls related to membership @@ -39,33 +43,35 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; pub enum SystemRpc { /// Response to successfull advertisements Ok, - /// Error response - Error(String), + /// Request to connect to a specific node (in <pubkey>@<host>:<port> format) + Connect(String), /// Ask other node its config. Answered with AdvertiseConfig PullConfig, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. - AdvertiseStatus(StateInfo), + AdvertiseStatus(NodeStatus), /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), /// Get known nodes states GetKnownNodes, /// Return known nodes - ReturnKnownNodes(Vec<(NodeID, SocketAddr, bool)>), + ReturnKnownNodes(Vec<KnownNodeInfo>), } -impl Message for SystemRpc { - type Response = SystemRpc; +impl Rpc for SystemRpc { + type Response = Result<SystemRpc, Error>; } /// This node's membership manager pub struct System { /// The id of this node - pub id: NodeID, + pub id: Uuid, persist_config: Persister<NetworkConfig>, + persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>, - state_info: ArcSwap<StateInfo>, + local_status: ArcSwap<NodeStatus>, + node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, pub netapp: Arc<NetApp>, fullmesh: Arc<FullMeshPeeringStrategy>, @@ -74,6 +80,7 @@ pub struct System { system_endpoint: Arc<Endpoint<SystemRpc, System>>, rpc_listen_addr: SocketAddr, + rpc_public_addr: Option<SocketAddr>, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option<String>, consul_service_name: Option<String>, @@ -88,7 +95,7 @@ pub struct System { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StateInfo { +pub struct NodeStatus { /// Hostname of the node pub hostname: String, /// Replication factor configured on the node @@ -97,26 +104,34 @@ pub struct StateInfo { pub config_version: u64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KnownNodeInfo { + pub id: Uuid, + pub addr: SocketAddr, + pub is_up: bool, + pub status: NodeStatus, +} + fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> { - let mut id_file = metadata_dir.to_path_buf(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; + let mut key_file = metadata_dir.to_path_buf(); + key_file.push("node_key"); + if key_file.as_path().exists() { + let mut f = std::fs::File::open(key_file.as_path())?; let mut d = vec![]; f.read_to_end(&mut d)?; if d.len() != 64 { - return Err(Error::Message("Corrupt node_id file".to_string())); + return Err(Error::Message("Corrupt node_key file".to_string())); } let mut key = [0u8; 64]; key.copy_from_slice(&d[..]); Ok(NodeKey::from_slice(&key[..]).unwrap()) } else { - let (key, _) = ed25519::gen_keypair(); + let (_, key) = ed25519::gen_keypair(); - let mut f = std::fs::File::create(id_file.as_path())?; + let mut f = std::fs::File::create(key_file.as_path())?; f.write_all(&key[..])?; - Ok(NodeKey::from_slice(&key[..]).unwrap()) + Ok(key) } } @@ -128,6 +143,7 @@ impl System { background: Arc<BackgroundRunner>, replication_factor: usize, rpc_listen_addr: SocketAddr, + rpc_public_address: Option<SocketAddr>, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option<String>, consul_service_name: Option<String>, @@ -136,29 +152,20 @@ impl System { info!("Node public key: {}", hex::encode(&node_key.public_key())); let persist_config = Persister::new(&metadata_dir, "network_config"); + let persist_peer_list = Persister::new(&metadata_dir, "peer_list"); let net_config = match persist_config.load() { Ok(x) => x, Err(e) => { - match Persister::<garage_rpc_021::ring::NetworkConfig>::new( - &metadata_dir, - "network_config", - ) - .load() - { - Ok(old_config) => NetworkConfig::migrate_from_021(old_config), - Err(e2) => { - info!( - "No valid previous network configuration stored ({}, {}), starting fresh.", - e, e2 - ); - NetworkConfig::new() - } - } + info!( + "No valid previous network configuration stored ({}), starting fresh.", + e + ); + NetworkConfig::new() } }; - let state_info = StateInfo { + let local_status = NodeStatus { hostname: gethostname::gethostname() .into_string() .unwrap_or_else(|_| "<invalid utf-8>".to_string()), @@ -169,15 +176,27 @@ impl System { let ring = Ring::new(net_config, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); + if let Some(addr) = rpc_public_address { + println!("{}@{}", hex::encode(&node_key.public_key()), addr); + } else { + println!("{}", hex::encode(&node_key.public_key())); + } + let netapp = NetApp::new(network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers.clone()); + let fullmesh = FullMeshPeeringStrategy::new( + netapp.clone(), + bootstrap_peers.clone(), + rpc_public_address, + ); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let sys = Arc::new(System { - id: netapp.id.clone(), + id: netapp.id.into(), persist_config, - state_info: ArcSwap::new(Arc::new(state_info)), + persist_peer_list, + local_status: ArcSwap::new(Arc::new(local_status)), + node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), rpc: RpcHelper { @@ -187,6 +206,7 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr, + rpc_public_addr: rpc_public_address, bootstrap_peers, consul_host, consul_service_name, @@ -206,11 +226,38 @@ impl System { .listen(self.rpc_listen_addr, None, must_exit.clone()), self.fullmesh.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), + self.status_exchange_loop(must_exit.clone()), ); } // ---- INTERNALS ---- + async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { + let (consul_host, consul_service_name) = + match (&self.consul_host, &self.consul_service_name) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + return Ok(()); + } + }; + + publish_consul_service( + consul_host, + consul_service_name, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e))) + } + /// Save network configuration to disc async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let ring: Arc<Ring> = self.ring.borrow().clone(); @@ -221,12 +268,27 @@ impl System { Ok(()) } - fn update_state_info(&self) { - let mut new_si: StateInfo = self.state_info.load().as_ref().clone(); + fn update_local_status(&self) { + let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let ring = self.ring.borrow(); new_si.config_version = ring.config.version; - self.state_info.swap(Arc::new(new_si)); + self.local_status.swap(Arc::new(new_si)); + } + + async fn handle_connect(&self, node: &str) -> Result<SystemRpc, Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node) + .ok_or_else(|| Error::Message(format!("Unable to parse or resolve node specification: {}", node)))?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self.netapp.clone().try_connect(*ip, pubkey).await { + Ok(()) => return Ok(SystemRpc::Ok), + Err(e) => { + errors.push((*ip, e)); + } + } + } + return Err(Error::Message(format!("Could not connect to specified peers. Errors: {:?}", errors))); } fn handle_pull_config(&self) -> SystemRpc { @@ -234,6 +296,58 @@ impl System { SystemRpc::AdvertiseConfig(ring.config.clone()) } + fn handle_get_known_nodes(&self) -> SystemRpc { + let node_status = self.node_status.read().unwrap(); + let known_nodes = + self.fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + status: node_status.get(&n.id.into()).cloned().map(|(_, st)| st).unwrap_or( + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + config_version: 0, + }, + ), + }) + .collect::<Vec<_>>(); + SystemRpc::ReturnKnownNodes(known_nodes) + } + + async fn handle_advertise_status( + self: &Arc<Self>, + from: Uuid, + info: &NodeStatus, + ) -> Result<SystemRpc, Error> { + let local_info = self.local_status.load(); + + if local_info.replication_factor < info.replication_factor { + error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs", + info.replication_factor, + local_info.replication_factor); + std::process::exit(1); + } + + if info.config_version > local_info.config_version { + let self2 = self.clone(); + self.background.spawn_cancellable(async move { + self2.pull_config(from).await; + Ok(()) + }); + } + + self.node_status + .write() + .unwrap() + .insert(from, (now_msec(), info.clone())); + + Ok(SystemRpc::Ok) + } + async fn handle_advertise_config( self: Arc<Self>, adv: &NetworkConfig, @@ -265,13 +379,32 @@ impl System { Ok(SystemRpc::Ok) } - async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) { - /* TODO + async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { + while !*stop_signal.borrow() { + let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL); + + self.update_local_status(); + let local_status: NodeStatus = self.local_status.load().as_ref().clone(); + self.rpc + .broadcast( + &self.system_endpoint, + SystemRpc::AdvertiseStatus(local_status), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + ) + .await; + + select! { + _ = restart_at.fuse() => {}, + _ = stop_signal.changed().fuse() => {}, + } + } + } + + async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { let consul_config = match (&self.consul_host, &self.consul_service_name) { (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), _ => None, }; - */ while !*stop_signal.borrow() { let not_configured = self.ring.borrow().config.members.is_empty(); @@ -286,34 +419,42 @@ impl System { if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let ping_list = self.bootstrap_peers.clone(); + let mut ping_list = self.bootstrap_peers.clone(); - /* - *TODO bring this back: persisted list of peers - if let Ok(peers) = self.persist_status.load_async().await { - ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); + // Add peer list from list stored on disk + if let Ok(peers) = self.persist_peer_list.load_async().await { + ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr))) } - */ - /* - * TODO bring this back: get peers from consul + // Fetch peer list from Consul if let Some((consul_host, consul_service_name)) = &consul_config { match get_consul_nodes(consul_host, consul_service_name).await { Ok(node_list) => { - ping_list.extend(node_list.iter().map(|a| (*a, None))); + ping_list.extend(node_list); } Err(e) => { warn!("Could not retrieve node list from Consul: {}", e); } } } - */ for (node_id, node_addr) in ping_list { tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id)); } } + let peer_list = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| (n.id.into(), n.addr)) + .collect::<Vec<_>>(); + if let Err(e) = self.persist_peer_list.save_async(&peer_list).await { + warn!("Could not save peer list to file: {}", e); + } + + self.background.spawn(self.clone().advertise_to_consul()); + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => {}, @@ -322,7 +463,7 @@ impl System { } } - async fn pull_config(self: Arc<Self>, peer: NodeID) { + async fn pull_config(self: Arc<Self>, peer: Uuid) { let resp = self .rpc .call( @@ -340,24 +481,14 @@ impl System { #[async_trait] impl EndpointHandler<SystemRpc> for System { - async fn handle(self: &Arc<Self>, msg: &SystemRpc, _from: NodeID) -> SystemRpc { - let resp = match msg { + async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { + match msg { + SystemRpc::Connect(node) => self.handle_connect(node).await, SystemRpc::PullConfig => Ok(self.handle_pull_config()), + SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await, - SystemRpc::GetKnownNodes => { - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| (n.id, n.addr, n.is_up())) - .collect::<Vec<_>>(); - Ok(SystemRpc::ReturnKnownNodes(known_nodes)) - } + SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), - }; - match resp { - Ok(r) => r, - Err(e) => SystemRpc::Error(format!("{}", e)), } } } |