diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api.rs | 65 | ||||
-rw-r--r-- | src/data.rs | 75 | ||||
-rw-r--r-- | src/error.rs | 19 | ||||
-rw-r--r-- | src/main.rs | 76 | ||||
-rw-r--r-- | src/proto.rs | 8 | ||||
-rw-r--r-- | src/rpc.rs | 42 |
6 files changed, 285 insertions, 0 deletions
diff --git a/src/api.rs b/src/api.rs new file mode 100644 index 00000000..9ee17047 --- /dev/null +++ b/src/api.rs @@ -0,0 +1,65 @@ +use futures_util::TryStreamExt; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use futures::future::Future; + +use crate::error::Error; +use crate::System; + +/// This is our service handler. It receives a Request, routes on its +/// path, and returns a Future of a Response. +async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> { + match (req.method(), req.uri().path()) { + // Serve some instructions at / + (&Method::GET, "/") => Ok(Response::new(Body::from( + "Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`", + ))), + + // Simply echo the body back to the client. + (&Method::POST, "/echo") => Ok(Response::new(req.into_body())), + + // Convert to uppercase before sending back to client using a stream. + (&Method::POST, "/echo/uppercase") => { + let chunk_stream = req.into_body().map_ok(|chunk| { + chunk + .iter() + .map(|byte| byte.to_ascii_uppercase()) + .collect::<Vec<u8>>() + }); + Ok(Response::new(Body::wrap_stream(chunk_stream))) + } + + // Reverse the entire body before sending back to the client. + // + // Since we don't know the end yet, we can't simply stream + // the chunks as they arrive as we did with the above uppercase endpoint. + // So here we do `.await` on the future, waiting on concatenating the full body, + // then afterwards the content can be reversed. Only then can we return a `Response`. + (&Method::POST, "/echo/reversed") => { + let whole_body = hyper::body::to_bytes(req.into_body()).await?; + + let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>(); + Ok(Response::new(Body::from(reversed_body))) + } + + // Return the 404 Not Found for other routes. + _ => { + let mut not_found = Response::default(); + *not_found.status_mut() = StatusCode::NOT_FOUND; + Ok(not_found) + } + } +} + +pub async fn run_api_server(sys: &System, api_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], api_port).into(); + + let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) }); + + let server = Server::bind(&addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("Listening on http://{}", addr); + + graceful.await +} diff --git a/src/data.rs b/src/data.rs new file mode 100644 index 00000000..651a9d45 --- /dev/null +++ b/src/data.rs @@ -0,0 +1,75 @@ +use std::net::SocketAddr; + +use serde::{Serialize, Deserialize}; + +pub type UUID = [u8; 32]; +pub type Hash = [u8; 32]; + +// Membership management + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeStatus { + id: UUID, + time: u64, + addr: SocketAddr, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeConfig { + id: UUID, + n_tokens: u32, +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct NetworkMembers { + pings: Vec<NodeStatus>, + desired_state: Vec<NodeConfig>, + desired_state_version: u64, +} + +// Data management + +#[derive(Debug, Serialize, Deserialize)] +pub struct SplitpointMeta { + bucket: String, + key: String, + + timestamp: u64, + uuid: UUID, + deleted: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct VersionMeta { + bucket: String, + key: String, + + timestamp: u64, + uuid: UUID, + deleted: bool, + + mime_type: String, + size: u64, + is_complete: bool, + + data: VersionData, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum VersionData { + Inline(Vec<u8>), + FirstBlock(Hash), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BlockMeta { + version_uuid: UUID, + offset: u64, + hash: Hash, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct BlockReverseMeta { + versions: Vec<UUID>, + deleted_versions: Vec<UUID>, +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 00000000..9929a896 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,19 @@ +use err_derive::Error; +use std::io; + +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "IO error")] + Io(#[error(source)] io::Error), + + #[error(display = "Hyper error")] + Hyper(#[error(source)] hyper::Error), + + #[error(display = "Messagepack encode error")] + RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error")] + RMPDecode(#[error(source)] rmp_serde::decode::Error), + + #[error(display = "")] + Msg(String), +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 00000000..711717d7 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,76 @@ +mod error; +mod data; +mod proto; +mod rpc; +mod api; + +use structopt::StructOpt; +use futures::channel::oneshot; +use tokio::sync::Mutex; +use hyper::client::Client; + +use data::*; + + +#[derive(StructOpt, Debug)] +#[structopt(name = "garage")] +pub struct Opt { + #[structopt(long = "api-port", default_value = "3900")] + api_port: u16, + + #[structopt(long = "rpc-port", default_value = "3901")] + rpc_port: u16, +} + +pub struct System { + pub opt: Opt, + + pub rpc_client: Client<hyper::client::HttpConnector, hyper::Body>, + + pub network_members: Mutex<NetworkMembers>, +} + +async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + for ch in chans { + ch.send(()).unwrap(); + } +} + +async fn wait_from(chan: oneshot::Receiver<()>) -> () { + chan.await.unwrap() +} + +#[tokio::main] +async fn main() { + let opt = Opt::from_args(); + let rpc_port = opt.rpc_port; + let api_port = opt.api_port; + + let sys = System{ + opt, + rpc_client: Client::new(), + network_members: Mutex::new(NetworkMembers::default()), + }; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + tokio::spawn(shutdown_signal(vec![tx1, tx2])); + + let rpc_server = rpc::run_rpc_server(&sys, rpc_port, wait_from(rx1)); + let api_server = api::run_api_server(&sys, api_port, wait_from(rx2)); + + let (e1, e2) = futures::join![rpc_server, api_server]; + + if let Err(e) = e1 { + eprintln!("RPC server error: {}", e) + } + + if let Err(e) = e2 { + eprintln!("API server error: {}", e) + } +} diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 00000000..029a58df --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,8 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Ok, + Error(String), + +} diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 00000000..314b64a9 --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,42 @@ +use bytes::IntoBuf; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, Server, StatusCode}; +use futures::future::Future; + +use crate::error::Error; +use crate::proto::Message; +use crate::System; + + +/// This is our service handler. It receives a Request, routes on its +/// path, and returns a Future of a Response. +async fn echo(req: Request<Body>) -> Result<Response<Body>, Error> { + if req.method() != &Method::POST { + let mut bad_request = Response::default(); + *bad_request.status_mut() = StatusCode::BAD_REQUEST; + return Ok(bad_request); + } + + + let whole_body = hyper::body::to_bytes(req.into_body()).await?; + let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf()); + + let resp = Message::Ok; + Ok(Response::new(Body::from( + rmp_serde::encode::to_vec_named(&resp)? + ))) +} + + +pub async fn run_rpc_server(sys: &System, rpc_port: u16, shutdown_signal: impl Future<Output=()>) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], rpc_port).into(); + + let service = make_service_fn(|_| async { Ok::<_, Error>(service_fn(echo)) }); + + let server = Server::bind(&addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + println!("Listening on http://{}", addr); + + graceful.await +} |