aboutsummaryrefslogblamecommitdiff
path: root/src/membership.rs
blob: 08dd5f2f6d0cc3fc364a15461bf9ce01b4d17df1 (plain) (tree)
1
2
3
4
5
6
7
8
9
                              

                               

                                   
                       
                   
                        
 
                              

                            
                                    

                           
                       
                       
 
                                        
                   
                        
                         
                         
                          




                                                        











                                              


























                                                       



                           

                                                
 



                                                                                   

                                              

 



                                             

 
                       




                                           







                                  
                      


                            

 
             
                                                                           
                                                              
                                                   

                                        
                                                   
                                                                          

                          

                                  
                                                                                            



                                                  

         

                                                                      
                                                                
 
                                               
                                                               
                                           

                                                                                        
                 
                                    
                         

                                                               
         
 
 
           


                                             
 



                                                                                             
 


                                                               
 

                                                                                                      
 






                                                                  
 


                                                                              
 




                                              
















                                                                                        

                                                 
         
 
                                                                           



                                                                                       

                                             
 

                                     
                                                                  
                                   
 
                                                                            

                                                                          

                                                                                                                

                         
 
                   
         

 
                                                                                

                                            



                                                  
 
                                          
                                                 
 
                                                                                  


                      

             





                                                  







                                                                                                               


                                                                

                         







                                                                               



                                           


                                                                         








                                                                                                            
 
                                           

                               
                                        
                                   


                                                                              
                                   




























                                                                                                                      

         
                                                                            

                                                                
 

                                                               
 

                                                                           
                      
         
 


                                                          
                                           
                                            
                                                       

                                                            


                  
                                                                                  


                                                          



                                                   
                                                                       
         
 
                                                 


                                          




                                                               

                                   

                                                                                        

         
                                                                                      
                                                






                                                                                



                                                                                        




                                 


                                                                               




                                                                
                                                                    
                                                                                  

                                                           
                                                                          



                                                                    
                                                                              

                                                                                                                      
                                 
                                                                                        

                                                                                                                      

                                                            

                                                               


                                                                           
                                                            



                                                                                               
                                                                 

                                                           
                                                                                    


                                                                                                    

                         
                                
                                                  
                 



                                                                                         
 
                                           

                                                                                             
                 

         




                                     



                                                                               
                           
                                                  
                 




                                                                       
 
                                                              

                                                                                                      

                                                                   

                                                                                                      

                 
                                    

         

                                                                    
                                     
                                                           
                                                 






                                                          


                                                                    

         



                                               
                                         
 

                                                                               

                                            
                                        


                                                                                                      
                                                                   

                                                        
                                                                
                                                                                          

                                          



                                                                       
                                                                        
                                                                                         

                         
                                
                                                  
                 

                                                           

                                      

                                                                                             




                               



                                             







                                                                         
 
                                                          
                                            

                                                                                                       
                          
                                                                                  




                               
                                                                                         

                                                                               
 


                                                                  



                                                                                            
 
                                                                  
 








                                                                            


                 



                                                                         
                            



                                                                               


                                                                                                   



                                                               



                                                                       
                                                                    
                                                                                          
                 
         
 
use std::collections::HashMap;
use std::hash::Hash as StdHash;
use std::hash::Hasher;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::prelude::*;
use tokio::sync::watch;
use tokio::sync::Mutex;

use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
use crate::rpc_client::*;
use crate::rpc_server::*;
use crate::server::Config;

const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: usize = 3;

#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
	Ok,
	Ping(PingMessage),
	PullStatus,
	PullConfig,
	AdvertiseNodesUp(Vec<AdvertisedNode>),
	AdvertiseConfig(NetworkConfig),
}

impl RpcMessage for Message {}

#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
	pub id: UUID,
	pub rpc_port: u16,

	pub status_hash: Hash,
	pub config_version: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
	pub id: UUID,
	pub addr: SocketAddr,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
	pub members: HashMap<UUID, NetworkConfigEntry>,
	pub version: u64,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
	pub datacenter: String,
	pub n_tokens: u32,
}

pub struct System {
	pub config: Config,
	pub id: UUID,

	pub rpc_http_client: Arc<RpcHttpClient>,
	rpc_client: Arc<RpcClient<Message>>,

	pub status: watch::Receiver<Arc<Status>>,
	pub ring: watch::Receiver<Arc<Ring>>,

	update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,

	pub background: Arc<BackgroundRunner>,
}

#[derive(Debug, Clone)]
pub struct Status {
	pub nodes: HashMap<UUID, NodeStatus>,
	pub hash: Hash,
}

#[derive(Debug, Clone)]
pub struct NodeStatus {
	pub addr: SocketAddr,
	pub remaining_ping_attempts: usize,
}

#[derive(Clone)]
pub struct Ring {
	pub config: NetworkConfig,
	pub ring: Vec<RingEntry>,
	pub n_datacenters: usize,
}

#[derive(Clone, Debug)]
pub struct RingEntry {
	pub location: Hash,
	pub node: UUID,
	pub datacenter: u64,
}

impl Status {
	fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
		let addr = SocketAddr::new(ip, info.rpc_port);
		let old_status = self.nodes.insert(
			info.id.clone(),
			NodeStatus {
				addr: addr.clone(),
				remaining_ping_attempts: MAX_FAILED_PINGS,
			},
		);
		match old_status {
			None => {
				eprintln!("Newly pingable node: {}", hex::encode(&info.id));
				true
			}
			Some(x) => x.addr != addr,
		}
	}

	fn recalculate_hash(&mut self) {
		let mut nodes = self.nodes.iter().collect::<Vec<_>>();
		nodes.sort_unstable_by_key(|(id, _status)| *id);

		let mut hasher = Sha256::new();
		eprintln!("Current set of pingable nodes: --");
		for (id, status) in nodes {
			eprintln!("{} {}", hex::encode(&id), status.addr);
			hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
		}
		eprintln!("END --");
		self.hash
			.as_slice_mut()
			.copy_from_slice(&hasher.result()[..]);
	}
}

impl Ring {
	fn rebuild_ring(&mut self) {
		let mut new_ring = vec![];
		let mut datacenters = vec![];

		for (id, config) in self.config.members.iter() {
			let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
			config.datacenter.hash(&mut dc_hasher);
			let datacenter = dc_hasher.finish();

			if !datacenters.contains(&datacenter) {
				datacenters.push(datacenter);
			}

			for i in 0..config.n_tokens {
				let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());

				new_ring.push(RingEntry {
					location: location.into(),
					node: id.clone(),
					datacenter,
				})
			}
		}

		new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
		self.ring = new_ring;
		self.n_datacenters = datacenters.len();

		// eprintln!("RING: --");
		// for e in self.ring.iter() {
		// 	eprintln!("{:?}", e);
		// }
		// eprintln!("END --");
	}

	pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
		if n >= self.config.members.len() {
			return self.config.members.keys().cloned().collect::<Vec<_>>();
		}

		let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
			Ok(i) => i,
			Err(i) => {
				if i == 0 {
					self.ring.len() - 1
				} else {
					i - 1
				}
			}
		};

		self.walk_ring_from_pos(start, n)
	}

	fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
		if n >= self.config.members.len() {
			return self.config.members.keys().cloned().collect::<Vec<_>>();
		}

		let mut ret = vec![];
		let mut datacenters = vec![];

		let mut delta = 0;
		while ret.len() < n {
			let i = (start + delta) % self.ring.len();
			delta += 1;

			if !datacenters.contains(&self.ring[i].datacenter) {
				ret.push(self.ring[i].node.clone());
				datacenters.push(self.ring[i].datacenter);
			} else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
				ret.push(self.ring[i].node.clone());
			}
		}

		ret
	}
}

fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
	let mut path = metadata_dir.clone();
	path.push("network_config");

	let mut file = std::fs::OpenOptions::new()
		.read(true)
		.open(path.as_path())?;

	let mut net_config_bytes = vec![];
	file.read_to_end(&mut net_config_bytes)?;

	let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;

	Ok(net_config)
}

impl System {
	pub fn new(
		config: Config,
		id: UUID,
		background: Arc<BackgroundRunner>,
		rpc_server: &mut RpcServer,
	) -> Arc<Self> {
		let net_config = match read_network_config(&config.metadata_dir) {
			Ok(x) => x,
			Err(e) => {
				println!(
					"No valid previous network configuration stored ({}), starting fresh.",
					e
				);
				NetworkConfig {
					members: HashMap::new(),
					version: 0,
				}
			}
		};
		let mut status = Status {
			nodes: HashMap::new(),
			hash: Hash::default(),
		};
		status.recalculate_hash();
		let (update_status, status) = watch::channel(Arc::new(status));

		let mut ring = Ring {
			config: net_config,
			ring: Vec::new(),
			n_datacenters: 0,
		};
		ring.rebuild_ring();
		let (update_ring, ring) = watch::channel(Arc::new(ring));

		let rpc_http_client =
			Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client"));

		let rpc_path = "_membership";
		let rpc_client = RpcClient::new(
			RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.into()),
			background.clone(),
			status.clone(),
		);

		let sys = Arc::new(System {
			config,
			id,
			rpc_http_client,
			rpc_client,
			status,
			ring,
			update_lock: Mutex::new((update_status, update_ring)),
			background,
		});
		sys.clone().register_handler(rpc_server, rpc_path.into());
		sys
	}

	fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
		rpc_server.add_handler::<Message, _, _>(path, move |msg, addr| {
			let self2 = self.clone();
			async move {
				match msg {
					Message::Ping(ping) => self2.handle_ping(&addr, &ping).await,

					Message::PullStatus => self2.handle_pull_status(),
					Message::PullConfig => self2.handle_pull_config(),
					Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
					Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,

					_ => Err(Error::Message(format!("Unexpected RPC message"))),
				}
			}
		});
	}

	pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
		RpcClient::new(
			RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
			self.background.clone(),
			self.status.clone(),
		)
	}

	async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
		let mut path = self.config.metadata_dir.clone();
		path.push("network_config");

		let ring = self.ring.borrow().clone();
		let data = rmp_to_vec_all_named(&ring.config)?;

		let mut f = tokio::fs::File::create(path.as_path()).await?;
		f.write_all(&data[..]).await?;
		Ok(())
	}

	pub fn make_ping(&self) -> Message {
		let status = self.status.borrow().clone();
		let ring = self.ring.borrow().clone();
		Message::Ping(PingMessage {
			id: self.id.clone(),
			rpc_port: self.config.rpc_port,
			status_hash: status.hash.clone(),
			config_version: ring.config.version,
		})
	}

	pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
		let status = self.status.borrow().clone();
		let to = status
			.nodes
			.keys()
			.filter(|x| **x != self.id)
			.cloned()
			.collect::<Vec<_>>();
		self.rpc_client.call_many(&to[..], msg, timeout).await;
	}

	pub async fn bootstrap(self: Arc<Self>) {
		let bootstrap_peers = self
			.config
			.bootstrap_peers
			.iter()
			.map(|ip| (ip.clone(), None))
			.collect::<Vec<_>>();
		self.clone().ping_nodes(bootstrap_peers).await;

		self.clone()
			.background
			.spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok))
			.await;
	}

	async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
		let ping_msg = self.make_ping();
		let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
			let sys = self.clone();
			let ping_msg_ref = &ping_msg;
			async move {
				(
					id_option,
					addr.clone(),
					sys.rpc_client
						.by_addr()
						.call(&addr, ping_msg_ref, PING_TIMEOUT)
						.await,
				)
			}
		}))
		.await;

		let update_locked = self.update_lock.lock().await;
		let mut status: Status = self.status.borrow().as_ref().clone();
		let ring = self.ring.borrow().clone();

		let mut has_changes = false;
		let mut to_advertise = vec![];

		for (id_option, addr, ping_resp) in ping_resps {
			if let Ok(Message::Ping(info)) = ping_resp {
				let is_new = status.handle_ping(addr.ip(), &info);
				if is_new {
					has_changes = true;
					to_advertise.push(AdvertisedNode {
						id: info.id.clone(),
						addr: addr.clone(),
					});
				}
				if is_new || status.hash != info.status_hash {
					self.background
						.spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
				}
				if is_new || ring.config.version < info.config_version {
					self.background
						.spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
				}
			} else if let Some(id) = id_option {
				let remaining_attempts = status
					.nodes
					.get(id)
					.map(|x| x.remaining_ping_attempts)
					.unwrap_or(0);
				if remaining_attempts == 0 {
					eprintln!(
						"Removing node {} after too many failed pings",
						hex::encode(&id)
					);
					status.nodes.remove(&id);
					has_changes = true;
				} else {
					if let Some(st) = status.nodes.get_mut(id) {
						st.remaining_ping_attempts = remaining_attempts - 1;
					}
				}
			}
		}
		if has_changes {
			status.recalculate_hash();
		}
		if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
			eprintln!("In ping_nodes: could not save status update ({})", e);
		}
		drop(update_locked);

		if to_advertise.len() > 0 {
			self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
				.await;
		}
	}

	pub async fn handle_ping(
		self: Arc<Self>,
		from: &SocketAddr,
		ping: &PingMessage,
	) -> Result<Message, Error> {
		let update_locked = self.update_lock.lock().await;
		let mut status: Status = self.status.borrow().as_ref().clone();

		let is_new = status.handle_ping(from.ip(), ping);
		if is_new {
			status.recalculate_hash();
		}
		let status_hash = status.hash.clone();
		let config_version = self.ring.borrow().config.version;

		update_locked.0.broadcast(Arc::new(status))?;
		drop(update_locked);

		if is_new || status_hash != ping.status_hash {
			self.background
				.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
		}
		if is_new || config_version < ping.config_version {
			self.background
				.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
		}

		Ok(self.make_ping())
	}

	pub fn handle_pull_status(&self) -> Result<Message, Error> {
		let status = self.status.borrow().clone();
		let mut mem = vec![];
		for (node, status) in status.nodes.iter() {
			mem.push(AdvertisedNode {
				id: node.clone(),
				addr: status.addr.clone(),
			});
		}
		Ok(Message::AdvertiseNodesUp(mem))
	}

	pub fn handle_pull_config(&self) -> Result<Message, Error> {
		let ring = self.ring.borrow().clone();
		Ok(Message::AdvertiseConfig(ring.config.clone()))
	}

	pub async fn handle_advertise_nodes_up(
		self: Arc<Self>,
		adv: &[AdvertisedNode],
	) -> Result<Message, Error> {
		let mut to_ping = vec![];

		let update_lock = self.update_lock.lock().await;
		let mut status: Status = self.status.borrow().as_ref().clone();
		let mut has_changed = false;

		for node in adv.iter() {
			if node.id == self.id {
				// learn our own ip address
				let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
				let old_self = status.nodes.insert(
					node.id.clone(),
					NodeStatus {
						addr: self_addr,
						remaining_ping_attempts: MAX_FAILED_PINGS,
					},
				);
				has_changed = match old_self {
					None => true,
					Some(x) => x.addr != self_addr,
				};
			} else if !status.nodes.contains_key(&node.id) {
				to_ping.push((node.addr.clone(), Some(node.id.clone())));
			}
		}
		if has_changed {
			status.recalculate_hash();
		}
		update_lock.0.broadcast(Arc::new(status))?;
		drop(update_lock);

		if to_ping.len() > 0 {
			self.background
				.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
		}

		Ok(Message::Ok)
	}

	pub async fn handle_advertise_config(
		self: Arc<Self>,
		adv: &NetworkConfig,
	) -> Result<Message, Error> {
		let mut ring: Ring = self.ring.borrow().as_ref().clone();
		let update_lock = self.update_lock.lock().await;

		if adv.version > ring.config.version {
			ring.config = adv.clone();
			ring.rebuild_ring();
			update_lock.1.broadcast(Arc::new(ring))?;
			drop(update_lock);

			self.background.spawn_cancellable(
				self.clone()
					.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
					.map(Ok),
			);
			self.background.spawn(self.clone().save_network_config());
		}

		Ok(Message::Ok)
	}

	pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
		loop {
			let restart_at = tokio::time::delay_for(PING_INTERVAL);

			let status = self.status.borrow().clone();
			let ping_addrs = status
				.nodes
				.iter()
				.filter(|(id, _)| **id != self.id)
				.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
				.collect::<Vec<_>>();

			self.clone().ping_nodes(ping_addrs).await;

			select! {
				_ = restart_at.fuse() => (),
				must_exit = stop_signal.recv().fuse() => {
					match must_exit {
						None | Some(true) => return,
						_ => (),
					}
				}
			}
		}
	}

	pub fn pull_status(
		self: Arc<Self>,
		peer: UUID,
	) -> impl futures::future::Future<Output = ()> + Send + 'static {
		async move {
			let resp = self
				.rpc_client
				.call(&peer, Message::PullStatus, PING_TIMEOUT)
				.await;
			if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
				let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
			}
		}
	}

	pub async fn pull_config(self: Arc<Self>, peer: UUID) {
		let resp = self
			.rpc_client
			.call(&peer, Message::PullConfig, PING_TIMEOUT)
			.await;
		if let Ok(Message::AdvertiseConfig(config)) = resp {
			let _: Result<_, _> = self.handle_advertise_config(&config).await;
		}
	}
}