aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-11 23:53:32 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-11 23:53:32 +0200
commit9c931f5edacbaaab746ecf180fac2dd7062d0336 (patch)
treef29cfd82f573ac871408256a33e11f9153bae1da /src/membership.rs
parent5dd59e437d5af84dfa2cf5dcc2c15807b971002d (diff)
downloadgarage-9c931f5edacbaaab746ecf180fac2dd7062d0336.tar.gz
garage-9c931f5edacbaaab746ecf180fac2dd7062d0336.zip
Keep network status & ring in a tokio::sync::watch
advantages - reads don't prevent preparing writes - can be followed from other parts of the system by cloning the receiver
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs185
1 files changed, 108 insertions, 77 deletions
diff --git a/src/membership.rs b/src/membership.rs
index cb8dba99..22c13f64 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -13,7 +13,7 @@ use futures_util::future::*;
use sha2::{Digest, Sha256};
use tokio::prelude::*;
use tokio::sync::watch;
-use tokio::sync::RwLock;
+use tokio::sync::Mutex;
use crate::background::BackgroundRunner;
use crate::data::*;
@@ -32,36 +32,44 @@ pub struct System {
pub rpc_client: RpcClient,
- pub members: RwLock<Members>,
+ pub status: watch::Receiver<Arc<Status>>,
+ pub ring: watch::Receiver<Arc<Ring>>,
+
+ update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>,
pub background: Arc<BackgroundRunner>,
}
-pub struct Members {
- pub status: HashMap<UUID, NodeStatus>,
- pub status_hash: Hash,
-
- pub config: NetworkConfig,
- pub ring: Vec<RingEntry>,
- pub n_datacenters: usize,
+#[derive(Debug, Clone)]
+pub struct Status {
+ pub nodes: HashMap<UUID, NodeStatus>,
+ pub hash: Hash,
}
+#[derive(Debug, Clone)]
pub struct NodeStatus {
pub addr: SocketAddr,
pub remaining_ping_attempts: usize,
}
-#[derive(Debug)]
+#[derive(Clone)]
+pub struct Ring {
+ pub config: NetworkConfig,
+ pub ring: Vec<RingEntry>,
+ pub n_datacenters: usize,
+}
+
+#[derive(Clone, Debug)]
pub struct RingEntry {
pub location: Hash,
pub node: UUID,
pub datacenter: u64,
}
-impl Members {
+impl Status {
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
let addr = SocketAddr::new(ip, info.rpc_port);
- let old_status = self.status.insert(
+ let old_status = self.nodes.insert(
info.id.clone(),
NodeStatus {
addr: addr.clone(),
@@ -77,8 +85,8 @@ impl Members {
}
}
- fn recalculate_status_hash(&mut self) {
- let mut nodes = self.status.iter().collect::<Vec<_>>();
+ fn recalculate_hash(&mut self) {
+ let mut nodes = self.nodes.iter().collect::<Vec<_>>();
nodes.sort_unstable_by_key(|(id, _status)| *id);
let mut hasher = Sha256::new();
@@ -88,11 +96,13 @@ impl Members {
hasher.input(format!("{} {}\n", hex::encode(&id), status.addr));
}
eprintln!("END --");
- self.status_hash
+ self.hash
.as_slice_mut()
.copy_from_slice(&hasher.result()[..]);
}
+}
+impl Ring {
fn rebuild_ring(&mut self) {
let mut new_ring = vec![];
let mut datacenters = vec![];
@@ -201,20 +211,28 @@ impl System {
}
}
};
- let mut members = Members {
- status: HashMap::new(),
- status_hash: Hash::default(),
+ let mut status = Status {
+ nodes: HashMap::new(),
+ hash: Hash::default(),
+ };
+ status.recalculate_hash();
+ let (update_status, status) = watch::channel(Arc::new(status));
+
+ let mut ring = Ring {
config: net_config,
ring: Vec::new(),
n_datacenters: 0,
};
- members.recalculate_status_hash();
- members.rebuild_ring();
+ ring.rebuild_ring();
+ let (update_ring, ring) = watch::channel(Arc::new(ring));
+
System {
config,
id,
rpc_client: RpcClient::new(),
- members: RwLock::new(members),
+ status,
+ ring,
+ update_lock: Mutex::new((update_status, update_ring)),
background,
}
}
@@ -223,37 +241,33 @@ impl System {
let mut path = self.config.metadata_dir.clone();
path.push("network_config");
- let members = self.members.read().await;
- let data =
- rmp_to_vec_all_named(&members.config)?;
- drop(members);
+ let ring = self.ring.borrow().clone();
+ let data = rmp_to_vec_all_named(&ring.config)?;
- let mut f = tokio::fs::File::create(path.as_path())
- .await?;
- f.write_all(&data[..])
- .await?;
+ let mut f = tokio::fs::File::create(path.as_path()).await?;
+ f.write_all(&data[..]).await?;
Ok(())
}
- pub async fn make_ping(&self) -> Message {
- let members = self.members.read().await;
+ pub fn make_ping(&self) -> Message {
+ let status = self.status.borrow().clone();
+ let ring = self.ring.borrow().clone();
Message::Ping(PingMessage {
id: self.id.clone(),
rpc_port: self.config.rpc_port,
- status_hash: members.status_hash.clone(),
- config_version: members.config.version,
+ status_hash: status.hash.clone(),
+ config_version: ring.config.version,
})
}
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
- let members = self.members.read().await;
- let to = members
- .status
+ let status = self.status.borrow().clone();
+ let to = status
+ .nodes
.keys()
.filter(|x| **x != self.id)
.cloned()
.collect::<Vec<_>>();
- drop(members);
rpc_call_many(self.clone(), &to[..], &msg, timeout).await;
}
@@ -273,7 +287,7 @@ impl System {
}
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
- let ping_msg = self.make_ping().await;
+ let ping_msg = self.make_ping();
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
@@ -287,14 +301,16 @@ impl System {
}))
.await;
- let mut members = self.members.write().await;
+ let update_locked = self.update_lock.lock().await;
+ let mut status: Status = self.status.borrow().as_ref().clone();
+ let ring = self.ring.borrow().clone();
let mut has_changes = false;
let mut to_advertise = vec![];
for (id_option, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(info)) = ping_resp {
- let is_new = members.handle_ping(addr.ip(), &info);
+ let is_new = status.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
to_advertise.push(AdvertisedNode {
@@ -302,17 +318,17 @@ impl System {
addr: addr.clone(),
});
}
- if is_new || members.status_hash != info.status_hash {
+ if is_new || status.hash != info.status_hash {
self.background
.spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok));
}
- if is_new || members.config.version < info.config_version {
+ if is_new || ring.config.version < info.config_version {
self.background
.spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok));
}
} else if let Some(id) = id_option {
- let remaining_attempts = members
- .status
+ let remaining_attempts = status
+ .nodes
.get(id)
.map(|x| x.remaining_ping_attempts)
.unwrap_or(0);
@@ -321,19 +337,22 @@ impl System {
"Removing node {} after too many failed pings",
hex::encode(&id)
);
- members.status.remove(&id);
+ status.nodes.remove(&id);
has_changes = true;
} else {
- if let Some(st) = members.status.get_mut(id) {
+ if let Some(st) = status.nodes.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1;
}
}
}
}
if has_changes {
- members.recalculate_status_hash();
+ status.recalculate_hash();
}
- drop(members);
+ if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ eprintln!("In ping_nodes: could not save status update ({})", e);
+ }
+ drop(update_locked);
if to_advertise.len() > 0 {
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
@@ -346,29 +365,35 @@ impl System {
from: &SocketAddr,
ping: &PingMessage,
) -> Result<Message, Error> {
- let mut members = self.members.write().await;
- let is_new = members.handle_ping(from.ip(), ping);
+ let update_locked = self.update_lock.lock().await;
+ let mut status: Status = self.status.borrow().as_ref().clone();
+
+ let is_new = status.handle_ping(from.ip(), ping);
if is_new {
- members.recalculate_status_hash();
+ status.recalculate_hash();
}
- let status_hash = members.status_hash.clone();
- let config_version = members.config.version;
- drop(members);
+ let status_hash = status.hash.clone();
+ let config_version = self.ring.borrow().config.version;
+
+ update_locked.0.broadcast(Arc::new(status))?;
+ drop(update_locked);
if is_new || status_hash != ping.status_hash {
- self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
+ self.background
+ .spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok));
}
if is_new || config_version < ping.config_version {
- self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
+ self.background
+ .spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok));
}
- Ok(self.make_ping().await)
+ Ok(self.make_ping())
}
- pub async fn handle_pull_status(&self) -> Result<Message, Error> {
- let members = self.members.read().await;
+ pub fn handle_pull_status(&self) -> Result<Message, Error> {
+ let status = self.status.borrow().clone();
let mut mem = vec![];
- for (node, status) in members.status.iter() {
+ for (node, status) in status.nodes.iter() {
mem.push(AdvertisedNode {
id: node.clone(),
addr: status.addr.clone(),
@@ -377,9 +402,9 @@ impl System {
Ok(Message::AdvertiseNodesUp(mem))
}
- pub async fn handle_pull_config(&self) -> Result<Message, Error> {
- let members = self.members.read().await;
- Ok(Message::AdvertiseConfig(members.config.clone()))
+ pub fn handle_pull_config(&self) -> Result<Message, Error> {
+ let ring = self.ring.borrow().clone();
+ Ok(Message::AdvertiseConfig(ring.config.clone()))
}
pub async fn handle_advertise_nodes_up(
@@ -388,14 +413,15 @@ impl System {
) -> Result<Message, Error> {
let mut to_ping = vec![];
- let mut members = self.members.write().await;
+ let update_lock = self.update_lock.lock().await;
+ let mut status: Status = self.status.borrow().as_ref().clone();
let mut has_changed = false;
for node in adv.iter() {
if node.id == self.id {
// learn our own ip address
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
- let old_self = members.status.insert(
+ let old_self = status.nodes.insert(
node.id.clone(),
NodeStatus {
addr: self_addr,
@@ -406,17 +432,19 @@ impl System {
None => true,
Some(x) => x.addr != self_addr,
};
- } else if !members.status.contains_key(&node.id) {
+ } else if !status.nodes.contains_key(&node.id) {
to_ping.push((node.addr.clone(), Some(node.id.clone())));
}
}
if has_changed {
- members.recalculate_status_hash();
+ status.recalculate_hash();
}
- drop(members);
+ update_lock.0.broadcast(Arc::new(status))?;
+ drop(update_lock);
if to_ping.len() > 0 {
- self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
+ self.background
+ .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
}
Ok(Message::Ok)
@@ -426,10 +454,14 @@ impl System {
self: Arc<Self>,
adv: &NetworkConfig,
) -> Result<Message, Error> {
- let mut members = self.members.write().await;
- if adv.version > members.config.version {
- members.config = adv.clone();
- members.rebuild_ring();
+ let mut ring: Ring = self.ring.borrow().as_ref().clone();
+ let update_lock = self.update_lock.lock().await;
+
+ if adv.version > ring.config.version {
+ ring.config = adv.clone();
+ ring.rebuild_ring();
+ update_lock.1.broadcast(Arc::new(ring))?;
+ drop(update_lock);
self.background.spawn_cancellable(
self.clone()
@@ -446,14 +478,13 @@ impl System {
loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL);
- let members = self.members.read().await;
- let ping_addrs = members
- .status
+ let status = self.status.borrow().clone();
+ let ping_addrs = status
+ .nodes
.iter()
.filter(|(id, _)| **id != self.id)
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
.collect::<Vec<_>>();
- drop(members);
self.clone().ping_nodes(ping_addrs).await;