aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
commit3477864142ed09c36abea1111937b829fb41c8a4 (patch)
treed95221e66b9c014af7f4dba61ae4ff113c0e409a /src/membership.rs
parentd66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff)
downloadgarage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz
garage-3477864142ed09c36abea1111937b829fb41c8a4.zip
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type.
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs340
1 files changed, 186 insertions, 154 deletions
diff --git a/src/membership.rs b/src/membership.rs
index 3f7a84c4..b713c7a4 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -1,22 +1,22 @@
-use std::sync::Arc;
+use std::collections::HashMap;
use std::hash::Hash as StdHash;
use std::hash::Hasher;
+use std::io::Read;
+use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
-use std::io::{Read};
-use std::collections::HashMap;
+use std::sync::Arc;
use std::time::Duration;
-use std::net::{IpAddr, SocketAddr};
-use sha2::{Sha256, Digest};
-use tokio::prelude::*;
use futures::future::join_all;
+use sha2::{Digest, Sha256};
+use tokio::prelude::*;
use tokio::sync::RwLock;
-use crate::server::Config;
-use crate::error::Error;
use crate::data::*;
+use crate::error::Error;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
@@ -36,8 +36,8 @@ pub struct Members {
pub status_hash: Hash,
pub config: NetworkConfig,
- pub ring: Vec<RingEntry>,
- pub n_datacenters: usize,
+ pub ring: Vec<RingEntry>,
+ pub n_datacenters: usize,
}
pub struct NodeStatus {
@@ -47,19 +47,21 @@ pub struct NodeStatus {
#[derive(Debug)]
pub struct RingEntry {
- pub location: Hash,
- pub node: UUID,
- pub datacenter: u64,
+ pub location: Hash,
+ pub node: UUID,
+ pub datacenter: u64,
}
impl Members {
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
let addr = SocketAddr::new(ip, info.rpc_port);
- let old_status = self.status.insert(info.id.clone(),
- NodeStatus{
+ let old_status = self.status.insert(
+ info.id.clone(),
+ NodeStatus {
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
- });
+ },
+ );
match old_status {
None => {
eprintln!("Newly pingable node: {}", hex::encode(&info.id));
@@ -80,122 +82,129 @@ impl Members {
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
}
eprintln!("END --");
- self.status_hash.as_slice_mut().copy_from_slice(&hasher.result()[..]);
+ self.status_hash
+ .as_slice_mut()
+ .copy_from_slice(&hasher.result()[..]);
}
- fn rebuild_ring(&mut self) {
- let mut new_ring = vec![];
- let mut datacenters = vec![];
+ fn rebuild_ring(&mut self) {
+ let mut new_ring = vec![];
+ let mut datacenters = vec![];
- for (id, config) in self.config.members.iter() {
- let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
- config.datacenter.hash(&mut dc_hasher);
- let datacenter = dc_hasher.finish();
+ for (id, config) in self.config.members.iter() {
+ let mut dc_hasher = std::collections::hash_map::DefaultHasher::new();
+ config.datacenter.hash(&mut dc_hasher);
+ let datacenter = dc_hasher.finish();
- if !datacenters.contains(&datacenter) {
- datacenters.push(datacenter);
- }
+ if !datacenters.contains(&datacenter) {
+ datacenters.push(datacenter);
+ }
- for i in 0..config.n_tokens {
- let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
+ for i in 0..config.n_tokens {
+ let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes());
- new_ring.push(RingEntry{
- location: location.into(),
- node: id.clone(),
- datacenter,
- })
- }
- }
+ new_ring.push(RingEntry {
+ location: location.into(),
+ node: id.clone(),
+ datacenter,
+ })
+ }
+ }
- new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
- self.ring = new_ring;
- self.n_datacenters = datacenters.len();
+ new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
+ self.ring = new_ring;
+ self.n_datacenters = datacenters.len();
eprintln!("RING: --");
for e in self.ring.iter() {
eprintln!("{:?}", e);
}
eprintln!("END --");
- }
-
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
- if n >= self.config.members.len() {
- return self.config.members.keys().cloned().collect::<Vec<_>>();
- }
-
- let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
- Ok(i) => i,
- Err(i) => if i == 0 {
- self.ring.len() - 1
- } else {
- i - 1
- }
- };
+ }
+
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ if n >= self.config.members.len() {
+ return self.config.members.keys().cloned().collect::<Vec<_>>();
+ }
+
+ let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) {
+ Ok(i) => i,
+ Err(i) => {
+ if i == 0 {
+ self.ring.len() - 1
+ } else {
+ i - 1
+ }
+ }
+ };
self.walk_ring_from_pos(start, n)
- }
+ }
fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> {
- let mut ret = vec![];
- let mut datacenters = vec![];
+ let mut ret = vec![];
+ let mut datacenters = vec![];
- for delta in 0..self.ring.len() {
- if ret.len() == n {
- break;
- }
+ for delta in 0..self.ring.len() {
+ if ret.len() == n {
+ break;
+ }
- let i = (start + delta) % self.ring.len();
+ let i = (start + delta) % self.ring.len();
- if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
- ret.push(self.ring[i].node.clone());
- } else if !datacenters.contains(&self.ring[i].datacenter) {
- ret.push(self.ring[i].node.clone());
- datacenters.push(self.ring[i].datacenter);
- }
- }
+ if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) {
+ ret.push(self.ring[i].node.clone());
+ } else if !datacenters.contains(&self.ring[i].datacenter) {
+ ret.push(self.ring[i].node.clone());
+ datacenters.push(self.ring[i].datacenter);
+ }
+ }
- ret
+ ret
}
}
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
- let mut path = metadata_dir.clone();
- path.push("network_config");
+ let mut path = metadata_dir.clone();
+ path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
-
+
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
- let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;
+ let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;
Ok(net_config)
}
impl System {
pub fn new(config: Config, id: UUID) -> Self {
- let net_config = match read_network_config(&config.metadata_dir) {
- Ok(x) => x,
- Err(e) => {
- println!("No valid previous network configuration stored ({}), starting fresh.", e);
- NetworkConfig{
+ let net_config = match read_network_config(&config.metadata_dir) {
+ Ok(x) => x,
+ Err(e) => {
+ println!(
+ "No valid previous network configuration stored ({}), starting fresh.",
+ e
+ );
+ NetworkConfig {
members: HashMap::new(),
version: 0,
}
- },
- };
- let mut members = Members{
- status: HashMap::new(),
- status_hash: Hash::default(),
- config: net_config,
- ring: Vec::new(),
- n_datacenters: 0,
- };
+ }
+ };
+ let mut members = Members {
+ status: HashMap::new(),
+ status_hash: Hash::default(),
+ config: net_config,
+ ring: Vec::new(),
+ n_datacenters: 0,
+ };
members.recalculate_status_hash();
- members.rebuild_ring();
- System{
+ members.rebuild_ring();
+ System {
config,
id,
rpc_client: RpcClient::new(),
@@ -203,24 +212,26 @@ impl System {
}
}
- async fn save_network_config(self: Arc<Self>) {
- let mut path = self.config.metadata_dir.clone();
- path.push("network_config");
+ async fn save_network_config(self: Arc<Self>) {
+ let mut path = self.config.metadata_dir.clone();
+ path.push("network_config");
- let members = self.members.read().await;
- let data = rmp_to_vec_all_named(&members.config)
- .expect("Error while encoding network config");
- drop(members);
+ let members = self.members.read().await;
+ let data =
+ rmp_to_vec_all_named(&members.config).expect("Error while encoding network config");
+ drop(members);
- let mut f = tokio::fs::File::create(path.as_path()).await
- .expect("Could not create network_config");
- f.write_all(&data[..]).await
- .expect("Could not write network_config");
- }
+ let mut f = tokio::fs::File::create(path.as_path())
+ .await
+ .expect("Could not create network_config");
+ f.write_all(&data[..])
+ .await
+ .expect("Could not write network_config");
+ }
pub async fn make_ping(&self) -> Message {
let members = self.members.read().await;
- Message::Ping(PingMessage{
+ Message::Ping(PingMessage {
id: self.id.clone(),
rpc_port: self.config.rpc_port,
status_hash: members.status_hash.clone(),
@@ -230,13 +241,20 @@ impl System {
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
let members = self.members.read().await;
- let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>();
+ let to = members
+ .status
+ .keys()
+ .filter(|x| **x != self.id)
+ .cloned()
+ .collect::<Vec<_>>();
drop(members);
rpc_call_many(self.clone(), &to[..], &msg, timeout).await;
}
pub async fn bootstrap(self: Arc<Self>) {
- let bootstrap_peers = self.config.bootstrap_peers
+ let bootstrap_peers = self
+ .config
+ .bootstrap_peers
.iter()
.map(|ip| (ip.clone(), None))
.collect::<Vec<_>>();
@@ -247,16 +265,19 @@ impl System {
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
let ping_msg = self.make_ping().await;
- let ping_resps = join_all(
- peers.iter()
- .map(|(addr, id_option)| {
- let sys = self.clone();
- let ping_msg_ref = &ping_msg;
- async move {
- (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await)
- }
- })).await;
-
+ let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
+ let sys = self.clone();
+ let ping_msg_ref = &ping_msg;
+ async move {
+ (
+ id_option,
+ addr.clone(),
+ sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await,
+ )
+ }
+ }))
+ .await;
+
let mut members = self.members.write().await;
let mut has_changes = false;
@@ -267,7 +288,7 @@ impl System {
let is_new = members.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
- to_advertise.push(AdvertisedNode{
+ to_advertise.push(AdvertisedNode {
id: info.id.clone(),
addr: addr.clone(),
});
@@ -279,9 +300,16 @@ impl System {
tokio::spawn(self.clone().pull_config(info.id.clone()));
}
} else if let Some(id) = id_option {
- let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
+ let remaining_attempts = members
+ .status
+ .get(id)
+ .map(|x| x.remaining_ping_attempts)
+ .unwrap_or(0);
if remaining_attempts == 0 {
- eprintln!("Removing node {} after too many failed pings", hex::encode(&id));
+ eprintln!(
+ "Removing node {} after too many failed pings",
+ hex::encode(&id)
+ );
members.status.remove(&id);
has_changes = true;
} else {
@@ -297,15 +325,16 @@ impl System {
drop(members);
if to_advertise.len() > 0 {
- self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await;
+ self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
+ .await;
}
}
- pub async fn handle_ping(self: Arc<Self>,
- from: &SocketAddr,
- ping: &PingMessage)
- -> Result<Message, Error>
- {
+ pub async fn handle_ping(
+ self: Arc<Self>,
+ from: &SocketAddr,
+ ping: &PingMessage,
+ ) -> Result<Message, Error> {
let mut members = self.members.write().await;
let is_new = members.handle_ping(from.ip(), ping);
if is_new {
@@ -329,7 +358,7 @@ impl System {
let members = self.members.read().await;
let mut mem = vec![];
for (node, status) in members.status.iter() {
- mem.push(AdvertisedNode{
+ mem.push(AdvertisedNode {
id: node.clone(),
addr: status.addr.clone(),
});
@@ -342,10 +371,10 @@ impl System {
Ok(Message::AdvertiseConfig(members.config.clone()))
}
- pub async fn handle_advertise_nodes_up(self: Arc<Self>,
- adv: &[AdvertisedNode])
- -> Result<Message, Error>
- {
+ pub async fn handle_advertise_nodes_up(
+ self: Arc<Self>,
+ adv: &[AdvertisedNode],
+ ) -> Result<Message, Error> {
let mut to_ping = vec![];
let mut members = self.members.write().await;
@@ -355,11 +384,13 @@ impl System {
if node.id == self.id {
// learn our own ip address
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
- let old_self = members.status.insert(node.id.clone(),
- NodeStatus{
+ let old_self = members.status.insert(
+ node.id.clone(),
+ NodeStatus {
addr: self_addr,
remaining_ping_attempts: MAX_FAILED_PINGS,
- });
+ },
+ );
has_changed = match old_self {
None => true,
Some(x) => x.addr != self_addr,
@@ -380,18 +411,20 @@ impl System {
Ok(Message::Ok)
}
- pub async fn handle_advertise_config(self: Arc<Self>,
- adv: &NetworkConfig)
- -> Result<Message, Error>
- {
+ pub async fn handle_advertise_config(
+ self: Arc<Self>,
+ adv: &NetworkConfig,
+ ) -> Result<Message, Error> {
let mut members = self.members.write().await;
if adv.version > members.config.version {
-
members.config = adv.clone();
- members.rebuild_ring();
+ members.rebuild_ring();
- tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
- tokio::spawn(self.clone().save_network_config());
+ tokio::spawn(
+ self.clone()
+ .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT),
+ );
+ tokio::spawn(self.clone().save_network_config());
}
Ok(Message::Ok)
@@ -400,12 +433,14 @@ impl System {
pub async fn ping_loop(self: Arc<Self>) {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
-
+
let members = self.members.read().await;
- let ping_addrs = members.status.iter()
- .filter(|(id, _)| **id != self.id)
- .map(|(id, status)| (status.addr.clone(), Some(id.clone())))
- .collect::<Vec<_>>();
+ let ping_addrs = members
+ .status
+ .iter()
+ .filter(|(id, _)| **id != self.id)
+ .map(|(id, status)| (status.addr.clone(), Some(id.clone())))
+ .collect::<Vec<_>>();
drop(members);
self.clone().ping_nodes(ping_addrs).await;
@@ -414,12 +449,12 @@ impl System {
}
}
- pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static {
+ pub fn pull_status(
+ self: Arc<Self>,
+ peer: UUID,
+ ) -> impl futures::future::Future<Output = ()> + Send + 'static {
async move {
- let resp = rpc_call(self.clone(),
- &peer,
- &Message::PullStatus,
- PING_TIMEOUT).await;
+ let resp = rpc_call(self.clone(), &peer, &Message::PullStatus, PING_TIMEOUT).await;
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
}
@@ -427,10 +462,7 @@ impl System {
}
pub async fn pull_config(self: Arc<Self>, peer: UUID) {
- let resp = rpc_call(self.clone(),
- &peer,
- &Message::PullConfig,
- PING_TIMEOUT).await;
+ let resp = rpc_call(self.clone(), &peer, &Message::PullConfig, PING_TIMEOUT).await;
if let Ok(Message::AdvertiseConfig(config)) = resp {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}