aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api.rs23
-rw-r--r--src/data.rs23
-rw-r--r--src/error.rs14
-rw-r--r--src/main.rs84
-rw-r--r--src/membership.rs86
-rw-r--r--src/proto.rs20
-rw-r--r--src/rpc.rs149
7 files changed, 336 insertions, 63 deletions
diff --git a/src/api.rs b/src/api.rs
index 9ee17047..cf70dbdf 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -1,14 +1,16 @@
+use std::sync::Arc;
+
use futures_util::TryStreamExt;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use futures::future::Future;
use crate::error::Error;
-use crate::System;
+use crate::membership::System;
/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
-async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
+async fn handler(sys: Arc<System>, req: Request<Body>) -> Result<Response<Body>, Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(Body::from(
@@ -51,15 +53,24 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
}
}
-pub async fn run_api_server(sys: &System, api_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
- let addr = ([0, 0, 0, 0], api_port).into();
+pub async fn run_api_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
+ let addr = ([0, 0, 0, 0], sys.config.api_port).into();
- let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
+ let service = make_service_fn(|_| {
+ let sys = sys.clone();
+ async move {
+ let sys = sys.clone();
+ Ok::<_, Error>(service_fn(move |req: Request<Body>| {
+ let sys = sys.clone();
+ handler(sys, req)
+ }))
+ }
+ });
let server = Server::bind(&addr).serve(service);
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("Listening on http://{}", addr);
+ println!("API server listening on http://{}", addr);
graceful.await
}
diff --git a/src/data.rs b/src/data.rs
index 651a9d45..e26a38f0 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -1,31 +1,8 @@
-use std::net::SocketAddr;
-
use serde::{Serialize, Deserialize};
pub type UUID = [u8; 32];
pub type Hash = [u8; 32];
-// Membership management
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct NodeStatus {
- id: UUID,
- time: u64,
- addr: SocketAddr,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct NodeConfig {
- id: UUID,
- n_tokens: u32,
-}
-
-#[derive(Default, Debug, Clone, Serialize, Deserialize)]
-pub struct NetworkMembers {
- pings: Vec<NodeStatus>,
- desired_state: Vec<NodeConfig>,
- desired_state_version: u64,
-}
// Data management
diff --git a/src/error.rs b/src/error.rs
index 9929a896..1e611adb 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -9,11 +9,23 @@ pub enum Error {
#[error(display = "Hyper error")]
Hyper(#[error(source)] hyper::Error),
+ #[error(display = "HTTP error")]
+ HTTP(#[error(source)] http::Error),
+
#[error(display = "Messagepack encode error")]
RMPEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error")]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
+ #[error(display = "TOML decode error")]
+ TomlDecode(#[error(source)] toml::de::Error),
+
+ #[error(display = "Timeout")]
+ RPCTimeout(#[error(source)] tokio::time::Elapsed),
+
+ #[error(display = "RPC error")]
+ RPCError(String),
+
#[error(display = "")]
- Msg(String),
+ Message(String),
}
diff --git a/src/main.rs b/src/main.rs
index 711717d7..4448d535 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,33 +1,73 @@
mod error;
mod data;
mod proto;
+mod membership;
mod rpc;
mod api;
+use std::io::{Read, Write};
+use std::sync::Arc;
+use std::net::SocketAddr;
+use std::path::PathBuf;
use structopt::StructOpt;
use futures::channel::oneshot;
-use tokio::sync::Mutex;
-use hyper::client::Client;
-
-use data::*;
+use serde::Deserialize;
+use rand::Rng;
+use data::UUID;
+use error::Error;
+use membership::System;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
pub struct Opt {
- #[structopt(long = "api-port", default_value = "3900")]
- api_port: u16,
+ #[structopt(short = "c", long = "config", default_value = "./config.toml")]
+ config_file: PathBuf,
+}
- #[structopt(long = "rpc-port", default_value = "3901")]
+#[derive(Deserialize, Debug)]
+pub struct Config {
+ metadata_dir: PathBuf,
+ data_dir: PathBuf,
+
+ api_port: u16,
rpc_port: u16,
+
+ bootstrap_peers: Vec<SocketAddr>,
}
-pub struct System {
- pub opt: Opt,
+fn read_config(config_file: PathBuf) -> Result<Config, Error> {
+ let mut file = std::fs::OpenOptions::new()
+ .read(true)
+ .open(config_file.as_path())?;
+
+ let mut config = String::new();
+ file.read_to_string(&mut config)?;
- pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
+ Ok(toml::from_str(&config)?)
+}
- pub network_members: Mutex<NetworkMembers>,
+fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
+ let mut id_file = metadata_dir.clone();
+ id_file.push("node_id");
+ if id_file.as_path().exists() {
+ let mut f = std::fs::File::open(id_file.as_path())?;
+ let mut d = vec![];
+ f.read_to_end(&mut d)?;
+ if d.len() != 32 {
+ return Err(Error::Message(format!("Corrupt node_id file")))
+ }
+
+ let mut id = [0u8; 32];
+ id.copy_from_slice(&d[..]);
+ Ok(id)
+ } else {
+ let id = rand::thread_rng().gen::<UUID>();
+
+ let mut f = std::fs::File::create(id_file.as_path())?;
+ f.write_all(&id[..])?;
+ Ok(id)
+ }
}
async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) {
@@ -47,22 +87,23 @@ async fn wait_from(chan: oneshot::Receiver<()>) -> () {
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
- let rpc_port = opt.rpc_port;
- let api_port = opt.api_port;
+ let config = read_config(opt.config_file)
+ .expect("Unable to read config file");
+
+ let id = gen_node_id(&config.metadata_dir)
+ .expect("Unable to read or generate node ID");
+ println!("Node ID: {}", hex::encode(id));
- let sys = System{
- opt,
- rpc_client: Client::new(),
- network_members: Mutex::new(NetworkMembers::default()),
- };
+ let sys = Arc::new(System::new(config, id));
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
- tokio::spawn(shutdown_signal(vec![tx1, tx2]));
+ let rpc_server = rpc::run_rpc_server(sys.clone(), wait_from(rx1));
+ let api_server = api::run_api_server(sys.clone(), wait_from(rx2));
- let rpc_server = rpc::run_rpc_server(&sys, rpc_port, wait_from(rx1));
- let api_server = api::run_api_server(&sys, api_port, wait_from(rx2));
+ tokio::spawn(shutdown_signal(vec![tx1, tx2]));
+ tokio::spawn(membership::bootstrap(sys));
let (e1, e2) = futures::join![rpc_server, api_server];
@@ -74,3 +115,4 @@ async fn main() {
eprintln!("API server error: {}", e)
}
}
+
diff --git a/src/membership.rs b/src/membership.rs
new file mode 100644
index 00000000..c14d370b
--- /dev/null
+++ b/src/membership.rs
@@ -0,0 +1,86 @@
+use std::sync::Arc;
+use std::collections::HashMap;
+use std::time::Duration;
+use std::net::SocketAddr;
+
+use hyper::client::Client;
+use tokio::sync::RwLock;
+
+use crate::Config;
+use crate::error::Error;
+use crate::data::*;
+use crate::proto::*;
+use crate::rpc::*;
+
+const PING_INTERVAL: Duration = Duration::from_secs(10);
+const PING_TIMEOUT: Duration = Duration::from_secs(2);
+const MAX_FAILED_PINGS: usize = 3;
+
+pub struct System {
+ pub config: Config,
+ pub id: UUID,
+
+ pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
+
+ pub members: RwLock<Members>,
+}
+
+pub struct Members {
+ pub present: Vec<UUID>,
+ pub status: HashMap<UUID, NodeStatus>,
+
+ pub config: HashMap<UUID, NodeConfig>,
+ pub config_version: u64,
+}
+
+pub struct NodeStatus {
+ pub addr: SocketAddr,
+ remaining_ping_attempts: usize,
+}
+
+pub struct NodeConfig {
+ pub n_tokens: u32,
+}
+
+impl System {
+ pub fn new(config: Config, id: UUID) -> Self {
+ System{
+ config,
+ id,
+ rpc_client: Client::new(),
+ members: RwLock::new(Members{
+ present: Vec::new(),
+ status: HashMap::new(),
+ config: HashMap::new(),
+ config_version: 0,
+ }),
+ }
+ }
+
+ pub async fn broadcast(&self) -> Vec<UUID> {
+ self.members.read().await.present.clone()
+ }
+}
+
+pub async fn bootstrap(system: Arc<System>) {
+ rpc_call_many_addr(system.clone(),
+ &system.config.bootstrap_peers,
+ &Message::Ping(PingMessage{
+ id: system.id,
+ rpc_port: system.config.rpc_port,
+ present_hash: [0u8; 32],
+ config_version: 0,
+ }),
+ None,
+ PING_TIMEOUT).await;
+
+ unimplemented!() //TODO
+}
+
+pub async fn handle_ping(sys: Arc<System>, from: &SocketAddr, ping: &PingMessage) -> Result<Message, Error> {
+ unimplemented!() //TODO
+}
+
+pub async fn handle_advertise_node(sys: Arc<System>, ping: &AdvertiseNodeMessage) -> Result<Message, Error> {
+ unimplemented!() //TODO
+}
diff --git a/src/proto.rs b/src/proto.rs
index 029a58df..6cb12598 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -1,8 +1,28 @@
+use std::net::SocketAddr;
use serde::{Serialize, Deserialize};
+use crate::data::*;
+
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
Error(String),
+ Ping(PingMessage),
+ AdvertiseNode(AdvertiseNodeMessage),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct PingMessage {
+ pub id: UUID,
+ pub rpc_port: u16,
+
+ pub present_hash: Hash,
+ pub config_version: u64,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct AdvertiseNodeMessage {
+ pub id: UUID,
+ pub addr: SocketAddr,
}
diff --git a/src/rpc.rs b/src/rpc.rs
index 314b64a9..4ba32c4c 100644
--- a/src/rpc.rs
+++ b/src/rpc.rs
@@ -1,42 +1,167 @@
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::time::Duration;
+
use bytes::IntoBuf;
use hyper::service::{make_service_fn, service_fn};
+use hyper::server::conn::AddrStream;
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use futures::future::Future;
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::stream::StreamExt;
+use crate::data::*;
use crate::error::Error;
use crate::proto::Message;
-use crate::System;
+use crate::membership::System;
+use crate::membership;
+
+// ---- CLIENT PART ----
+
+pub async fn rpc_call_many(sys: Arc<System>,
+ to: &[UUID],
+ msg: &Message,
+ stop_after: Option<usize>,
+ timeout: Duration)
+ -> Vec<Result<Message, Error>>
+{
+ let resp_stream = to.iter()
+ .map(|to| rpc_call(sys.clone(), to, msg, timeout))
+ .collect::<FuturesUnordered<_>>();
+
+ collect_rpc_results(resp_stream, stop_after).await
+}
+
+pub async fn rpc_call_many_addr(sys: Arc<System>,
+ to: &[SocketAddr],
+ msg: &Message,
+ stop_after: Option<usize>,
+ timeout: Duration)
+ -> Vec<Result<Message, Error>>
+{
+ let resp_stream = to.iter()
+ .map(|to| rpc_call_addr(sys.clone(), to, msg, timeout))
+ .collect::<FuturesUnordered<_>>();
+
+ collect_rpc_results(resp_stream, stop_after).await
+}
+
+async fn collect_rpc_results(mut resp_stream: FuturesUnordered<impl Future<Output=Result<Message, Error>>>,
+ stop_after: Option<usize>)
+ -> Vec<Result<Message, Error>>
+{
+ let mut results = vec![];
+ let mut n_ok = 0;
+ while let Some(resp) = resp_stream.next().await {
+ if resp.is_ok() {
+ n_ok += 1
+ }
+ results.push(resp);
+ if let Some(n) = stop_after {
+ if n_ok >= n {
+ break
+ }
+ }
+ }
+ results
+}
+
+// ----
+pub async fn rpc_call(sys: Arc<System>,
+ to: &UUID,
+ msg: &Message,
+ timeout: Duration)
+ -> Result<Message, Error>
+{
+ let addr = {
+ let members = sys.members.read().await;
+ match members.status.get(to) {
+ Some(status) => status.addr.clone(),
+ None => return Err(Error::Message(format!("Peer ID not found"))),
+ }
+ };
+ rpc_call_addr(sys, &addr, msg, timeout).await
+}
-/// This is our service handler. It receives a Request, routes on its
-/// path, and returns a Future of a Response.
-async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
+pub async fn rpc_call_addr(sys: Arc<System>,
+ to_addr: &SocketAddr,
+ msg: &Message,
+ timeout: Duration)
+ -> Result<Message, Error>
+{
+ let uri = format!("http://{}/", to_addr);
+ let req = Request::builder()
+ .method(Method::POST)
+ .uri(uri)
+ .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?;
+
+ let resp_fut = sys.rpc_client.request(req);
+ let resp = tokio::time::timeout(timeout, resp_fut).await??;
+
+ if resp.status() == StatusCode::OK {
+ let body = hyper::body::to_bytes(resp.into_body()).await?;
+ let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?;
+ match msg {
+ Message::Error(e) => Err(Error::RPCError(e)),
+ x => Ok(x)
+ }
+ } else {
+ Err(Error::RPCError(format!("Status code {}", resp.status())))
+ }
+}
+
+// ---- SERVER PART ----
+
+fn err_to_msg(x: Result<Message, Error>) -> Message {
+ match x {
+ Err(e) => Message::Error(format!("{}", e)),
+ Ok(msg) => msg,
+ }
+}
+
+async fn handler(sys: Arc<System>, req: Request<Body>, addr: SocketAddr) -> Result<Response<Body>, Error> {
if req.method() != &Method::POST {
let mut bad_request = Response::default();
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
}
-
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf());
+ let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
+
+ eprintln!("RPC from {}: {:?}", addr, msg);
+
+ let resp = err_to_msg(match &msg {
+ Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await,
+ Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await,
+ _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
+ });
- let resp = Message::Ok;
Ok(Response::new(Body::from(
rmp_serde::encode::to_vec_named(&resp)?
)))
}
-pub async fn run_rpc_server(sys: &System, rpc_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
- let addr = ([0, 0, 0, 0], rpc_port).into();
+pub async fn run_rpc_server(sys: Arc<System>, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
+ let bind_addr = ([0, 0, 0, 0], sys.config.rpc_port).into();
- let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
+ let service = make_service_fn(|conn: &AddrStream| {
+ let client_addr = conn.remote_addr();
+ let sys = sys.clone();
+ async move {
+ Ok::<_, Error>(service_fn(move |req: Request<Body>| {
+ let sys = sys.clone();
+ handler(sys, req, client_addr)
+ }))
+ }
+ });
- let server = Server::bind(&addr).serve(service);
+ let server = Server::bind(&bind_addr).serve(service) ;
let graceful = server.with_graceful_shutdown(shutdown_signal);
- println!("Listening on http://{}", addr);
+ println!("RPC server listening on http://{}", bind_addr);
graceful.await
}