aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-05 23:33:42 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-05 23:33:42 +0200
commit7102db1d544bec663a8492b24c455168d0b83f08 (patch)
treecc308cbeefc3f48b55149e85ec737867c24a498e /src
downloadgarage-7102db1d544bec663a8492b24c455168d0b83f08.tar.gz
garage-7102db1d544bec663a8492b24c455168d0b83f08.zip
First commit: skeleton for something great
Diffstat (limited to 'src')
-rw-r--r--src/api.rs65
-rw-r--r--src/data.rs75
-rw-r--r--src/error.rs19
-rw-r--r--src/main.rs76
-rw-r--r--src/proto.rs8
-rw-r--r--src/rpc.rs42
6 files changed, 285 insertions, 0 deletions
diff --git a/src/api.rs b/src/api.rs
new file mode 100644
index 00000000..9ee17047
--- /dev/null
+++ b/src/api.rs
@@ -0,0 +1,65 @@
+use futures_util::TryStreamExt;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Method, Request, Response, Server, StatusCode};
+use futures::future::Future;
+
+use crate::error::Error;
+use crate::System;
+
+/// This is our service handler. It receives a Request, routes on its
+/// path, and returns a Future of a Response.
+async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
+ match (req.method(), req.uri().path()) {
+ // Serve some instructions at /
+ (&Method::GET, "/") => Ok(Response::new(Body::from(
+ "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`",
+ ))),
+
+ // Simply echo the body back to the client.
+ (&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
+
+ // Convert to uppercase before sending back to client using a stream.
+ (&Method::POST, "/echo/uppercase") => {
+ let chunk_stream = req.into_body().map_ok(|chunk| {
+ chunk
+ .iter()
+ .map(|byte| byte.to_ascii_uppercase())
+ .collect::<Vec<u8>>()
+ });
+ Ok(Response::new(Body::wrap_stream(chunk_stream)))
+ }
+
+ // Reverse the entire body before sending back to the client.
+ //
+ // Since we don't know the end yet, we can't simply stream
+ // the chunks as they arrive as we did with the above uppercase endpoint.
+ // So here we do `.await` on the future, waiting on concatenating the full body,
+ // then afterwards the content can be reversed. Only then can we return a `Response`.
+ (&Method::POST, "/echo/reversed") => {
+ let whole_body = hyper::body::to_bytes(req.into_body()).await?;
+
+ let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
+ Ok(Response::new(Body::from(reversed_body)))
+ }
+
+ // Return the 404 Not Found for other routes.
+ _ => {
+ let mut not_found = Response::default();
+ *not_found.status_mut() = StatusCode::NOT_FOUND;
+ Ok(not_found)
+ }
+ }
+}
+
+pub async fn run_api_server(sys: &System, api_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
+ let addr = ([0, 0, 0, 0], api_port).into();
+
+ let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
+
+ let server = Server::bind(&addr).serve(service);
+
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ println!("Listening on http://{}", addr);
+
+ graceful.await
+}
diff --git a/src/data.rs b/src/data.rs
new file mode 100644
index 00000000..651a9d45
--- /dev/null
+++ b/src/data.rs
@@ -0,0 +1,75 @@
+use std::net::SocketAddr;
+
+use serde::{Serialize, Deserialize};
+
+pub type UUID = [u8; 32];
+pub type Hash = [u8; 32];
+
+// Membership management
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct NodeStatus {
+ id: UUID,
+ time: u64,
+ addr: SocketAddr,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct NodeConfig {
+ id: UUID,
+ n_tokens: u32,
+}
+
+#[derive(Default, Debug, Clone, Serialize, Deserialize)]
+pub struct NetworkMembers {
+ pings: Vec<NodeStatus>,
+ desired_state: Vec<NodeConfig>,
+ desired_state_version: u64,
+}
+
+// Data management
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct SplitpointMeta {
+ bucket: String,
+ key: String,
+
+ timestamp: u64,
+ uuid: UUID,
+ deleted: bool,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct VersionMeta {
+ bucket: String,
+ key: String,
+
+ timestamp: u64,
+ uuid: UUID,
+ deleted: bool,
+
+ mime_type: String,
+ size: u64,
+ is_complete: bool,
+
+ data: VersionData,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum VersionData {
+ Inline(Vec<u8>),
+ FirstBlock(Hash),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct BlockMeta {
+ version_uuid: UUID,
+ offset: u64,
+ hash: Hash,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct BlockReverseMeta {
+ versions: Vec<UUID>,
+ deleted_versions: Vec<UUID>,
+}
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 00000000..9929a896
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,19 @@
+use err_derive::Error;
+use std::io;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "IO error")]
+ Io(#[error(source)] io::Error),
+
+ #[error(display = "Hyper error")]
+ Hyper(#[error(source)] hyper::Error),
+
+ #[error(display = "Messagepack encode error")]
+ RMPEncode(#[error(source)] rmp_serde::encode::Error),
+ #[error(display = "Messagepack decode error")]
+ RMPDecode(#[error(source)] rmp_serde::decode::Error),
+
+ #[error(display = "")]
+ Msg(String),
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 00000000..711717d7
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,76 @@
+mod error;
+mod data;
+mod proto;
+mod rpc;
+mod api;
+
+use structopt::StructOpt;
+use futures::channel::oneshot;
+use tokio::sync::Mutex;
+use hyper::client::Client;
+
+use data::*;
+
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "garage")]
+pub struct Opt {
+ #[structopt(long = "api-port", default_value = "3900")]
+ api_port: u16,
+
+ #[structopt(long = "rpc-port", default_value = "3901")]
+ rpc_port: u16,
+}
+
+pub struct System {
+ pub opt: Opt,
+
+ pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>,
+
+ pub network_members: Mutex<NetworkMembers>,
+}
+
+async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) {
+ // Wait for the CTRL+C signal
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
+ for ch in chans {
+ ch.send(()).unwrap();
+ }
+}
+
+async fn wait_from(chan: oneshot::Receiver<()>) -> () {
+ chan.await.unwrap()
+}
+
+#[tokio::main]
+async fn main() {
+ let opt = Opt::from_args();
+ let rpc_port = opt.rpc_port;
+ let api_port = opt.api_port;
+
+ let sys = System{
+ opt,
+ rpc_client: Client::new(),
+ network_members: Mutex::new(NetworkMembers::default()),
+ };
+
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+
+ tokio::spawn(shutdown_signal(vec![tx1, tx2]));
+
+ let rpc_server = rpc::run_rpc_server(&sys, rpc_port, wait_from(rx1));
+ let api_server = api::run_api_server(&sys, api_port, wait_from(rx2));
+
+ let (e1, e2) = futures::join![rpc_server, api_server];
+
+ if let Err(e) = e1 {
+ eprintln!("RPC server error: {}", e)
+ }
+
+ if let Err(e) = e2 {
+ eprintln!("API server error: {}", e)
+ }
+}
diff --git a/src/proto.rs b/src/proto.rs
new file mode 100644
index 00000000..029a58df
--- /dev/null
+++ b/src/proto.rs
@@ -0,0 +1,8 @@
+use serde::{Serialize, Deserialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Message {
+ Ok,
+ Error(String),
+
+}
diff --git a/src/rpc.rs b/src/rpc.rs
new file mode 100644
index 00000000..314b64a9
--- /dev/null
+++ b/src/rpc.rs
@@ -0,0 +1,42 @@
+use bytes::IntoBuf;
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Method, Request, Response, Server, StatusCode};
+use futures::future::Future;
+
+use crate::error::Error;
+use crate::proto::Message;
+use crate::System;
+
+
+/// This is our service handler. It receives a Request, routes on its
+/// path, and returns a Future of a Response.
+async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> {
+ if req.method() != &Method::POST {
+ let mut bad_request = Response::default();
+ *bad_request.status_mut() = StatusCode::BAD_REQUEST;
+ return Ok(bad_request);
+ }
+
+
+ let whole_body = hyper::body::to_bytes(req.into_body()).await?;
+ let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf());
+
+ let resp = Message::Ok;
+ Ok(Response::new(Body::from(
+ rmp_serde::encode::to_vec_named(&resp)?
+ )))
+}
+
+
+pub async fn run_rpc_server(sys: &System, rpc_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> {
+ let addr = ([0, 0, 0, 0], rpc_port).into();
+
+ let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) });
+
+ let server = Server::bind(&addr).serve(service);
+
+ let graceful = server.with_graceful_shutdown(shutdown_signal);
+ println!("Listening on http://{}", addr);
+
+ graceful.await
+}