aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml21
-rw-r--r--src/rpc/membership.rs49
-rw-r--r--src/rpc/ring.rs40
-rw-r--r--src/rpc/rpc_client.rs8
-rw-r--r--src/rpc/rpc_server.rs14
5 files changed, 72 insertions, 60 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 48f05755..fbe826a8 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -15,27 +15,26 @@ path = "lib.rs"
[dependencies]
garage_util = { version = "0.1.1", path = "../util" }
-bytes = "0.4"
-rand = "0.7"
-hex = "0.3"
-sha2 = "0.8"
-arc-swap = "0.4"
+bytes = "1.0"
+hex = "0.4"
+arc-swap = "1.0"
gethostname = "0.2"
log = "0.4"
-rmp-serde = "0.14.3"
+rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = { version = "0.1", features = ["net"] }
http = "0.2"
-hyper = "0.13"
-rustls = "0.17"
-tokio-rustls = "0.13"
-hyper-rustls = { version = "0.20", default-features = false }
+hyper = { version = "0.14", features = ["full"] }
+rustls = "0.19"
+tokio-rustls = "0.22"
+hyper-rustls = { version = "0.22", default-features = false }
webpki = "0.21"
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 44d7122a..4e9822fa 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -11,13 +11,14 @@ use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use tokio::prelude::*;
+use tokio::io::AsyncWriteExt;
use tokio::sync::watch;
use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use crate::consul::get_consul_nodes;
use crate::ring::*;
@@ -315,23 +316,17 @@ impl System {
self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("ping loop"), |stop_signal| {
- self2.ping_loop(stop_signal).map(Ok)
- })
- .await;
+ self2.ping_loop(stop_signal)
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
- self.clone()
- .background
+ self.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
- self2
- .consul_loop(stop_signal, consul_host, consul_service_name)
- .map(Ok)
- })
- .await;
+ self2.consul_loop(stop_signal, consul_host, consul_service_name)
+ });
}
}
@@ -399,7 +394,7 @@ impl System {
if has_changes {
status.recalculate_hash();
}
- if let Err(e) = update_locked.0.broadcast(Arc::new(status)) {
+ if let Err(e) = update_locked.0.send(Arc::new(status)) {
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked);
@@ -425,7 +420,7 @@ impl System {
let status_hash = status.hash;
let config_version = self.ring.borrow().config.version;
- update_locked.0.broadcast(Arc::new(status))?;
+ update_locked.0.send(Arc::new(status))?;
drop(update_locked);
if is_new || status_hash != ping.status_hash {
@@ -507,7 +502,7 @@ impl System {
if has_changed {
status.recalculate_hash();
}
- update_lock.0.broadcast(Arc::new(status))?;
+ update_lock.0.send(Arc::new(status))?;
drop(update_lock);
if to_ping.len() > 0 {
@@ -527,7 +522,7 @@ impl System {
if adv.version > ring.config.version {
let ring = Ring::new(adv.clone());
- update_lock.1.broadcast(Arc::new(ring))?;
+ update_lock.1.send(Arc::new(ring))?;
drop(update_lock);
self.background.spawn_cancellable(
@@ -543,7 +538,7 @@ impl System {
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop {
- let restart_at = tokio::time::delay_for(PING_INTERVAL);
+ let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone();
let ping_addrs = status
@@ -557,10 +552,9 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
+ _ = stop_signal.changed().fuse() => {
+ if *stop_signal.borrow() {
+ return;
}
}
}
@@ -573,8 +567,8 @@ impl System {
consul_host: String,
consul_service_name: String,
) {
- loop {
- let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
+ while !*stop_signal.borrow() {
+ let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
@@ -588,12 +582,7 @@ impl System {
select! {
_ = restart_at.fuse() => (),
- must_exit = stop_signal.recv().fuse() => {
- match must_exit {
- None | Some(true) => return,
- _ => (),
- }
- }
+ _ = stop_signal.changed().fuse() => (),
}
}
}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 85caafeb..2e997523 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -5,6 +5,11 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+// A partition number is encoded on 16 bits,
+// i.e. we have up to 2**16 partitions.
+// (in practice we have exactly 2**PARTITION_BITS partitions)
+pub type Partition = u16;
+
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
@@ -161,29 +166,48 @@ impl Ring {
})
.collect::<Vec<_>>();
- eprintln!("RING: --");
- for e in ring.iter() {
- eprintln!("{:?}", e);
- }
- eprintln!("END --");
+ // eprintln!("RING: --");
+ // for e in ring.iter() {
+ // eprintln!("{:?}", e);
+ // }
+ // eprintln!("END --");
Self { config, ring }
}
+ pub fn partition_of(&self, from: &Hash) -> Partition {
+ let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
+ top >> (16 - PARTITION_BITS)
+ }
+
+ pub fn partitions(&self) -> Vec<(Partition, Hash)> {
+ let mut ret = vec![];
+
+ for (i, entry) in self.ring.iter().enumerate() {
+ ret.push((i as u16, entry.location));
+ }
+ if ret.len() > 0 {
+ assert_eq!(ret[0].1, [0u8; 32].into());
+ }
+
+ ret
+ }
+
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
- warn!("Ring not yet ready, read/writes will be lost");
+ warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
}
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
-
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
+ assert_eq!(partition_idx, self.partition_of(from) as usize);
+
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
- assert!(partition_top & PARTITION_MASK_U16 == top & PARTITION_MASK_U16);
+ assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());
partition.nodes[..n].iter().cloned().collect::<Vec<_>>()
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 70384391..eb4f6620 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -7,7 +7,6 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
-use bytes::IntoBuf;
use futures::future::Future;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
@@ -197,11 +196,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if !strategy.rs_interrupt_after_quorum {
let wait_finished_fut = tokio::spawn(async move {
resp_stream.collect::<Vec<_>>().await;
- Ok(())
});
- self.background.spawn(wait_finished_fut.map(|x| {
- x.unwrap_or_else(|e| Err(Error::Message(format!("Await failed: {}", e))))
- }));
+ self.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
Ok(results)
@@ -336,7 +332,7 @@ impl RpcHttpClient {
let body = hyper::body::to_bytes(resp.into_body()).await?;
drop(slot);
- match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())? {
+ match rmp_serde::decode::from_read::<_, Result<M, String>>(&body[..])? {
Err(e) => Ok(Err(Error::RemoteError(e, status))),
Ok(x) => Ok(Ok(x)),
}
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 1c6bc8d2..0d82d796 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -4,7 +4,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
-use bytes::IntoBuf;
use futures::future::Future;
use futures_util::future::*;
use futures_util::stream::*;
@@ -15,6 +14,7 @@ use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use tokio_stream::wrappers::TcpListenerStream;
use garage_util::config::TlsConfig;
use garage_util::data::*;
@@ -47,11 +47,15 @@ where
{
let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+ let msg = rmp_serde::decode::from_read::<_, M>(&whole_body[..])?;
trace!(
"Request message: {}",
- serde_json::to_string(&msg).unwrap_or("<json error>".into())
+ serde_json::to_string(&msg)
+ .unwrap_or("<json error>".into())
+ .chars()
+ .take(100)
+ .collect::<String>()
);
match handler(msg, sockaddr).await {
@@ -171,8 +175,8 @@ impl RpcServer {
config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
- let mut listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = listener.incoming().filter_map(|socket| async {
+ let listener = TcpListener::bind(&self.bind_addr).await?;
+ let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
match socket {
Ok(stream) => match tls_acceptor.clone().accept(stream).await {
Ok(x) => Some(Ok::<_, hyper::Error>(x)),