aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-06 22:27:51 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-06 22:27:51 +0200
commit87f2b4d2fc8835f8adda69adb6b51fd73ffb20a5 (patch)
treeb28f8e2c585489f51fc083d343179e25d8f8c76a /src/membership.rs
parent3c36b449a3786bb62fa023bde37bac24635b5717 (diff)
downloadgarage-87f2b4d2fc8835f8adda69adb6b51fd73ffb20a5.tar.gz
garage-87f2b4d2fc8835f8adda69adb6b51fd73ffb20a5.zip
Ununderstandable error
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs197
1 files changed, 162 insertions, 35 deletions
diff --git a/src/membership.rs b/src/membership.rs
index 2c9cd1ca..b025c2bb 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -29,33 +29,41 @@ pub struct System {
}
pub struct Members {
- pub present: Vec<UUID>,
pub status: HashMap<UUID, NodeStatus>,
pub status_hash: Hash,
- pub config: HashMap<UUID, NodeConfig>,
- pub config_version: u64,
+ pub config: NetworkConfig,
}
impl Members {
- fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) {
- match self.present.binary_search(&info.id) {
- Ok(pos) => {}
- Err(pos) => self.present.insert(pos, info.id.clone()),
- }
+ fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
self.status.insert(info.id.clone(),
NodeStatus{
addr: SocketAddr::new(ip, info.rpc_port),
remaining_ping_attempts: MAX_FAILED_PINGS,
- });
+ }).is_none()
+ }
+
+ fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
+ if !self.status.contains_key(id) {
+ self.status.insert(id.clone(),
+ NodeStatus{
+ addr: addr.clone(),
+ remaining_ping_attempts: MAX_FAILED_PINGS,
+ });
+ true
+ } else {
+ false
+ }
}
fn recalculate_status_hash(&mut self) {
+ let mut nodes = self.status.iter().collect::<Vec<_>>();
+ nodes.sort_by_key(|(id, _status)| *id);
+
let mut hasher = Sha256::new();
- for node in self.present.iter() {
- if let Some(status) = self.status.get(node) {
- hasher.input(format!("{} {}\n", hex::encode(node), status.addr));
- }
+ for (id, status) in nodes {
+ hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
}
self.status_hash.copy_from_slice(&hasher.result()[..]);
}
@@ -66,9 +74,6 @@ pub struct NodeStatus {
pub remaining_ping_attempts: usize,
}
-pub struct NodeConfig {
- pub n_tokens: u32,
-}
impl System {
pub fn new(config: Config, id: UUID) -> Self {
@@ -77,26 +82,31 @@ impl System {
id,
rpc_client: Client::new(),
members: RwLock::new(Members{
- present: Vec::new(),
status: HashMap::new(),
status_hash: [0u8; 32],
- config: HashMap::new(),
- config_version: 0,
+ config: NetworkConfig{
+ members: HashMap::new(),
+ version: 0,
+ },
}),
}
}
pub async fn make_ping(&self) -> Message {
+ let members = self.members.read().await;
Message::Ping(PingMessage{
id: self.id,
rpc_port: self.config.rpc_port,
- present_hash: self.members.read().await.status_hash.clone(),
- config_version: 0,
+ status_hash: members.status_hash.clone(),
+ config_version: members.config.version,
})
}
- pub async fn broadcast(&self) -> Vec<UUID> {
- self.members.read().await.present.clone()
+ pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
+ let members = self.members.read().await;
+ let to = members.status.keys().cloned().collect::<Vec<_>>();
+ drop(members);
+ rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
}
pub async fn bootstrap(self: Arc<Self>) {
@@ -115,19 +125,12 @@ impl System {
for (addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(info)) = ping_resp {
members.handle_ping(addr.ip(), &info);
-
}
}
members.recalculate_status_hash();
drop(members);
- let resps = rpc_call_many_addr(self.clone(),
- &self.config.bootstrap_peers,
- &ping_msg,
- None,
- PING_TIMEOUT).await;
-
- unimplemented!() //TODO
+ tokio::spawn(self.ping_loop());
}
pub async fn handle_ping(self: Arc<Self>,
@@ -136,17 +139,141 @@ impl System {
-> Result<Message, Error>
{
let mut members = self.members.write().await;
- members.handle_ping(from.ip(), ping);
+ let is_new = members.handle_ping(from.ip(), ping);
members.recalculate_status_hash();
+ let status_hash = members.status_hash.clone();
+ let config_version = members.config.version;
drop(members);
+ if is_new || status_hash != ping.status_hash {
+ tokio::spawn(self.clone().pull_status(ping.id.clone()));
+ }
+ if is_new || config_version < ping.config_version {
+ tokio::spawn(self.clone().pull_config(ping.id.clone()));
+ }
+
Ok(self.make_ping().await)
}
- pub async fn handle_advertise_node(self: Arc<Self>,
- ping: &AdvertiseNodeMessage)
+ pub async fn handle_pull_status(&self) -> Result<Message, Error> {
+ let members = self.members.read().await;
+ let mut mem = vec![];
+ for (node, status) in members.status.iter() {
+ mem.push(AdvertisedNode{
+ id: node.clone(),
+ addr: status.addr.clone(),
+ });
+ }
+ Ok(Message::AdvertiseNodesUp(mem))
+ }
+
+ pub async fn handle_pull_config(&self) -> Result<Message, Error> {
+ let members = self.members.read().await;
+ Ok(Message::AdvertiseConfig(members.config.clone()))
+ }
+
+ pub async fn handle_advertise_nodes_up(self: Arc<Self>,
+ adv: &[AdvertisedNode])
-> Result<Message, Error>
{
- unimplemented!() //TODO
+ let mut propagate = vec![];
+
+ let mut members = self.members.write().await;
+ for node in adv.iter() {
+ let is_new = members.handle_advertise_node(&node.id, &node.addr);
+ if is_new {
+ tokio::spawn(self.clone().pull_status(node.id.clone()));
+ tokio::spawn(self.clone().pull_config(node.id.clone()));
+ propagate.push(node.clone());
+ }
+ }
+
+ if propagate.len() > 0 {
+ tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
+ }
+
+ Ok(Message::Ok)
+ }
+
+ pub async fn handle_advertise_config(self: Arc<Self>,
+ adv: &NetworkConfig)
+ -> Result<Message, Error>
+ {
+ let mut members = self.members.write().await;
+ if adv.version > members.config.version {
+ members.config = adv.clone();
+ tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
+ }
+
+ Ok(Message::Ok)
+ }
+
+ pub async fn ping_loop(self: Arc<Self>) {
+ loop {
+ let restart_at = tokio::time::delay_for(PING_INTERVAL);
+
+ let members = self.members.read().await;
+ let ping_addrs = members.status.iter()
+ .map(|(id, status)| (id.clone(), status.addr.clone()))
+ .collect::<Vec<_>>();
+ drop(members);
+
+ let ping_msg = self.make_ping().await;
+ let ping_resps = join_all(
+ ping_addrs.iter()
+ .map(|(id, addr)| {
+ let sys = self.clone();
+ let ping_msg_ref = &ping_msg;
+ async move {
+ (id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
+ }
+ })).await;
+
+ let mut members = self.members.write().await;
+ for (id, addr, ping_resp) in ping_resps {
+ if let Ok(Message::Ping(ping)) = ping_resp {
+ let is_new = members.handle_ping(addr.ip(), &ping);
+ if is_new || members.status_hash != ping.status_hash {
+ tokio::spawn(self.clone().pull_status(ping.id.clone()));
+ }
+ if is_new || members.config.version < ping.config_version {
+ tokio::spawn(self.clone().pull_config(ping.id.clone()));
+ }
+ } else {
+ let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
+ if remaining_attempts == 0 {
+ eprintln!("Removing node {} after too many failed pings", hex::encode(id));
+ members.status.remove(id);
+ } else {
+ if let Some(st) = members.status.get_mut(id) {
+ st.remaining_ping_attempts = remaining_attempts - 1;
+ }
+ }
+ }
+ }
+ drop(members);
+
+ restart_at.await
+ }
+ }
+
+ pub async fn pull_status(self: Arc<Self>, peer: UUID) {
+ let resp = rpc_call(self.clone(),
+ &peer,
+ &Message::PullStatus,
+ PING_TIMEOUT).await;
+ if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
+ self.handle_advertise_nodes_up(&nodes).await;
+ }
+ }
+
+ pub async fn pull_config(self: Arc<Self>, peer: UUID) {
+ let resp = rpc_call(self.clone(),
+ &peer,
+ &Message::PullConfig,
+ PING_TIMEOUT).await;
+ if let Ok(Message::AdvertiseConfig(config)) = resp {
+ self.handle_advertise_config(&config).await;
+ }
}
}