From 3477864142ed09c36abea1111937b829fb41c8a4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 10 Apr 2020 22:01:48 +0200 Subject: Fix the Sync issue. Details: So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type. --- src/api_server.rs | 281 ++++++++++++++++++++++++++++++------------ src/block.rs | 10 +- src/data.rs | 24 ++-- src/error.rs | 6 +- src/main.rs | 111 ++++++++++------- src/membership.rs | 340 ++++++++++++++++++++++++++++----------------------- src/object_table.rs | 20 +-- src/proto.rs | 8 +- src/rpc_client.rs | 71 +++++------ src/rpc_server.rs | 53 ++++---- src/server.rs | 59 +++++---- src/table.rs | 94 +++++++++----- src/version_table.rs | 7 +- 13 files changed, 657 insertions(+), 427 deletions(-) (limited to 'src') diff --git a/src/api_server.rs b/src/api_server.rs index a5e6e322..fec89e93 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -1,26 +1,35 @@ -use std::sync::Arc; -use std::net::SocketAddr; +use core::pin::Pin; +use core::task::{Context, Poll}; + use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::Arc; +use futures::future::Future; +use futures::ready; use futures::stream::*; -use hyper::service::{make_service_fn, service_fn}; +use hyper::body::{Bytes, HttpBody}; use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use hyper::body::Bytes; -use futures::future::Future; -use crate::error::Error; -use crate::data::*; use crate::data; +use crate::data::*; +use crate::error::Error; use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; use crate::table::EmptySortKey; -pub async fn run_api_server(garage: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { - let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); +type BodyType = Box + Send + Unpin>; + +pub async fn run_api_server( + garage: Arc, + shutdown_signal: impl Future, +) -> Result<(), hyper::Error> { + let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); - let service = make_service_fn(|conn: &AddrStream| { + let service = make_service_fn(|conn: &AddrStream| { let garage = garage.clone(); let client_addr = conn.remote_addr(); async move { @@ -31,59 +40,72 @@ pub async fn run_api_server(garage: Arc, shutdown_signal: impl Future, req: Request, addr: SocketAddr) -> Result, Error> { +async fn handler( + garage: Arc, + req: Request, + addr: SocketAddr, +) -> Result, Error> { match handler_inner(garage, req, addr).await { Ok(x) => Ok(x), Err(e) => { - let mut http_error = Response::new(Body::from(format!("{}\n", e))); + let body: BodyType = Box::new(BytesBody::from(format!("{}\n", e))); + let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); Ok(http_error) } } } -async fn handler_inner(garage: Arc, req: Request, addr: SocketAddr) -> Result, Error> { +async fn handler_inner( + garage: Arc, + req: Request, + addr: SocketAddr, +) -> Result, Error> { eprintln!("{} {} {}", addr, req.method(), req.uri()); - let bucket = req.headers() + let bucket = req + .headers() .get(hyper::header::HOST) .map(|x| x.to_str().map_err(Error::from)) .unwrap_or(Err(Error::BadRequest(format!("Host: header missing"))))? .to_lowercase(); let key = req.uri().path().to_string(); - match req.method() { - &Method::GET => { - Ok(handle_get(garage, &bucket, &key).await?) - } + match req.method() { + &Method::GET => Ok(handle_get(garage, &bucket, &key).await?), &Method::PUT => { - let mime_type = req.headers() + let mime_type = req + .headers() .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) .unwrap_or(Ok("blob"))? .to_string(); - let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; - Ok(Response::new(Body::from( - format!("{:?}\n", version_uuid), - ))) + let version_uuid = + handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; + Ok(Response::new(Box::new(BytesBody::from(format!( + "{:?}\n", + version_uuid + ))))) } - _ => Err(Error::BadRequest(format!("Invalid method"))), - } + _ => Err(Error::BadRequest(format!("Invalid method"))), + } } -async fn handle_put(garage: Arc, - mime_type: &str, - bucket: &str, key: &str, body: Body) - -> Result -{ +async fn handle_put( + garage: Arc, + mime_type: &str, + bucket: &str, + key: &str, + body: Body, +) -> Result { let version_uuid = gen_uuid(); let mut chunker = BodyChunker::new(body, garage.system.config.block_size); @@ -97,7 +119,7 @@ async fn handle_put(garage: Arc, key: key.into(), versions: Vec::new(), }; - object.versions.push(Box::new(ObjectVersion{ + object.versions.push(Box::new(ObjectVersion { uuid: version_uuid.clone(), timestamp: now_msec(), mime_type: mime_type.to_string(), @@ -110,7 +132,7 @@ async fn handle_put(garage: Arc, object.versions[0].data = ObjectVersionData::Inline(first_block); object.versions[0].is_complete = true; garage.object_table.insert(&object).await?; - return Ok(version_uuid) + return Ok(version_uuid); } let version = Version { @@ -126,15 +148,22 @@ async fn handle_put(garage: Arc, garage.object_table.insert(&object).await?; let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); + let mut put_curr_version_block = + put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); loop { - let (_, _, next_block) = futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = put_version_block(garage.clone(), &version, next_offset as u64, block_hash.clone()); + put_curr_version_block = put_version_block( + garage.clone(), + &version, + next_offset as u64, + block_hash.clone(), + ); put_curr_block = put_block(garage.clone(), block_hash, block); next_offset += block_len; } else { @@ -150,27 +179,33 @@ async fn handle_put(garage: Arc, Ok(version_uuid) } -async fn put_version_block(garage: Arc, version: &Version, offset: u64, hash: Hash) -> Result<(), Error> { +async fn put_version_block( + garage: Arc, + version: &Version, + offset: u64, + hash: Hash, +) -> Result<(), Error> { let mut version = version.clone(); - version.blocks.push(VersionBlock{ - offset, - hash, - }); + version.blocks.push(VersionBlock { offset, hash }); garage.version_table.insert(&version).await?; Ok(()) } async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), Error> { - let who = garage.system.members.read().await + let who = garage + .system + .members + .read() + .await .walk_ring(&hash, garage.system.config.meta_replication_factor); - rpc_try_call_many(garage.system.clone(), - &who[..], - &Message::PutBlock(PutBlockMessage{ - hash, - data, - }), - (garage.system.config.meta_replication_factor+1)/2, - DEFAULT_TIMEOUT).await?; + rpc_try_call_many( + garage.system.clone(), + &who[..], + &Message::PutBlock(PutBlockMessage { hash, data }), + (garage.system.config.meta_replication_factor + 1) / 2, + DEFAULT_TIMEOUT, + ) + .await?; Ok(()) } @@ -183,7 +218,7 @@ struct BodyChunker { impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { - Self{ + Self { body, read_all: false, block_size, @@ -203,26 +238,36 @@ impl BodyChunker { if self.buf.len() == 0 { Ok(None) } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..) - .collect::>(); + let block = self.buf.drain(..).collect::>(); Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size) - .collect::>(); + let block = self.buf.drain(..self.block_size).collect::>(); Ok(Some(block)) } } } -async fn handle_get(garage: Arc, bucket: &str, key: &str) -> Result, Error> { - let mut object = match garage.object_table.get(&bucket.to_string(), &key.to_string()).await? { +async fn handle_get( + garage: Arc, + bucket: &str, + key: &str, +) -> Result, Error> { + let mut object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { None => return Err(Error::NotFound), - Some(o) => o + Some(o) => o, }; - let last_v = match object.versions.drain(..) - .rev().filter(|v| v.is_complete) - .next() { + let last_v = match object + .versions + .drain(..) + .rev() + .filter(|v| v.is_complete) + .next() + { Some(v) => v, None => return Err(Error::NotFound), }; @@ -234,7 +279,8 @@ async fn handle_get(garage: Arc, bucket: &str, key: &str) -> Result Err(Error::NotFound), ObjectVersionData::Inline(bytes) => { - Ok(resp_builder.body(bytes.into())?) + let body: BodyType = Box::new(BytesBody::from(bytes)); + Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { let read_first_block = get_block(garage.clone(), &first_block_hash); @@ -246,42 +292,119 @@ async fn handle_get(garage: Arc, bucket: &str, key: &str) -> Result return Err(Error::NotFound), }; - let mut blocks = version.blocks.iter() + let mut blocks = version + .blocks + .iter() .map(|vb| (vb.hash.clone(), None)) .collect::>(); blocks[0].1 = Some(first_block); - let block_futures = blocks.drain(..) - .map(move |(hash, data_opt)| async { + let body_stream = futures::stream::iter(blocks) + .map(move |(hash, data_opt)| { + let garage = garage.clone(); + async move { if let Some(data) = data_opt { - Ok(data) + Ok(Bytes::from(data)) } else { - get_block(garage.clone(), &hash).await - .map_err(|e| format!("{}", e)) + get_block(garage.clone(), &hash).await.map(Bytes::from) } - }); - let body_stream = futures::stream::iter(block_futures).buffered(2); - let body = Body::wrap_stream(body_stream); + } + }) + .buffered(2); + let body: BodyType = Box::new(NonSyncStreamBody { + stream: Box::pin(body_stream), + }); Ok(resp_builder.body(body)?) } } } async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { - let who = garage.system.members.read().await + let who = garage + .system + .members + .read() + .await .walk_ring(&hash, garage.system.config.meta_replication_factor); - let resps = rpc_try_call_many(garage.system.clone(), - &who[..], - &Message::GetBlock(hash.clone()), - 1, - DEFAULT_TIMEOUT).await?; + let resps = rpc_try_call_many( + garage.system.clone(), + &who[..], + &Message::GetBlock(hash.clone()), + 1, + DEFAULT_TIMEOUT, + ) + .await?; for resp in resps { if let Message::PutBlock(pbm) = resp { if data::hash(&pbm.data) == *hash { - return Ok(pbm.data) + return Ok(pbm.data); } } } Err(Error::Message(format!("No valid blocks returned"))) } + +pub struct NonSyncStreamBody { + pub stream: Pin> + Send>>, +} + +impl HttpBody for NonSyncStreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + match ready!(self.stream.as_mut().poll_next(cx)) { + Some(res) => Poll::Ready(Some(res)), + None => Poll::Ready(None), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +pub struct BytesBody { + pub bytes: Option, +} + +impl HttpBody for BytesBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll>> { + Poll::Ready(self.bytes.take().map(Ok)) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll>, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +impl From for BytesBody { + fn from(x: String) -> BytesBody { + BytesBody { + bytes: Some(Bytes::from(x.into_bytes())), + } + } +} +impl From> for BytesBody { + fn from(x: Vec) -> BytesBody { + BytesBody { + bytes: Some(Bytes::from(x)), + } + } +} diff --git a/src/block.rs b/src/block.rs index b9e7eee8..99a0121f 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,13 +1,13 @@ -use std::sync::Arc; use std::path::PathBuf; +use std::sync::Arc; use tokio::fs; use tokio::prelude::*; +use crate::data::*; use crate::error::Error; -use crate::server::Garage; use crate::proto::*; -use crate::data::*; +use crate::server::Garage; fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf { let mut path = garage.system.config.data_dir.clone(); @@ -24,7 +24,7 @@ pub async fn write_block(garage: Arc, hash: &Hash, data: &[u8]) -> Resul path.push(hex::encode(hash)); if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok) + return Ok(Message::Ok); } let mut f = fs::File::create(path).await?; @@ -42,7 +42,7 @@ pub async fn read_block(garage: Arc, hash: &Hash) -> Result Visitor<'de> for FixedBytes32Visitor { res.copy_from_slice(value); Ok(res.into()) } else { - Err(E::custom(format!("Invalid byte string length {}, expected 32", value.len()))) + Err(E::custom(format!( + "Invalid byte string length {}, expected 32", + value.len() + ))) } } } @@ -88,7 +91,8 @@ pub fn gen_uuid() -> UUID { } pub fn now_msec() -> u64 { - SystemTime::now().duration_since(UNIX_EPOCH) + SystemTime::now() + .duration_since(UNIX_EPOCH) .expect("Fix your clock :o") .as_millis() as u64 } @@ -96,7 +100,8 @@ pub fn now_msec() -> u64 { // RMP serialization with names of fields and variants pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> -where T: Serialize + ?Sized +where + T: Serialize + ?Sized, { let mut wr = Vec::with_capacity(128); let mut se = rmp_serde::Serializer::new(&mut wr) @@ -104,7 +109,6 @@ where T: Serialize + ?Sized .with_string_variants(); val.serialize(&mut se)?; Ok(wr) - } // Network management diff --git a/src/error.rs b/src/error.rs index 8b2fc419..5eb5a960 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ -use std::io; use err_derive::Error; use hyper::StatusCode; +use std::io; #[derive(Debug, Error)] pub enum Error { @@ -13,8 +13,8 @@ pub enum Error { #[error(display = "HTTP error: {}", _0)] HTTP(#[error(source)] http::Error), - #[error(display = "Invalid HTTP header value: {}", _0)] - HTTPHeader(#[error(source)] http::header::ToStrError), + #[error(display = "Invalid HTTP header value: {}", _0)] + HTTPHeader(#[error(source)] http::header::ToStrError), #[error(display = "Sled error: {}", _0)] Sled(#[error(source)] sled::Error), diff --git a/src/main.rs b/src/main.rs index 324f0d49..69fb2863 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,28 @@ -mod error; mod data; +mod error; mod proto; mod membership; mod table; +mod block; mod object_table; mod version_table; -mod block; -mod server; -mod rpc_server; -mod rpc_client; mod api_server; +mod rpc_client; +mod rpc_server; +mod server; use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use structopt::StructOpt; -use error::Error; -use rpc_client::RpcClient; use data::*; +use error::Error; use proto::*; +use rpc_client::RpcClient; #[derive(StructOpt, Debug)] #[structopt(name = "garage")] @@ -69,7 +69,6 @@ pub struct ConfigureOpt { n_tokens: u32, } - #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -77,12 +76,8 @@ async fn main() { let rpc_cli = RpcClient::new(); let resp = match opt.cmd { - Command::Server(server_opt) => { - server::run_server(server_opt.config_file).await - } - Command::Status => { - cmd_status(rpc_cli, opt.rpc_host).await - } + Command::Server(server_opt) => server::run_server(server_opt.config_file).await, + Command::Status => cmd_status(rpc_cli, opt.rpc_host).await, Command::Configure(configure_opt) => { cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await } @@ -94,28 +89,40 @@ async fn main() { } async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Error> { - let status = match rpc_cli.call(&rpc_host, - &Message::PullStatus, - DEFAULT_TIMEOUT).await? { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let config = match rpc_cli.call(&rpc_host, - &Message::PullConfig, - DEFAULT_TIMEOUT).await? { + let config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; println!("Healthy nodes:"); for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { - println!("{}\t{}\t{}\t{}", hex::encode(&adv.id), cfg.datacenter, cfg.n_tokens, adv.addr); + println!( + "{}\t{}\t{}\t{}", + hex::encode(&adv.id), + cfg.datacenter, + cfg.n_tokens, + adv.addr + ); } } let status_keys = status.iter().map(|x| x.id.clone()).collect::>(); - if config.members.iter().any(|(id, _)| !status_keys.contains(id)) { + if config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)) + { println!("\nFailed nodes:"); for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { @@ -124,7 +131,10 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro } } - if status.iter().any(|adv| !config.members.contains_key(&adv.id)) { + if status + .iter() + .any(|adv| !config.members.contains_key(&adv.id)) + { println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { @@ -136,12 +146,17 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro Ok(()) } -async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: ConfigureOpt) -> Result<(), Error> { - let status = match rpc_cli.call(&rpc_host, - &Message::PullStatus, - DEFAULT_TIMEOUT).await? { +async fn cmd_configure( + rpc_cli: RpcClient, + rpc_host: SocketAddr, + args: ConfigureOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseNodesUp(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let mut candidates = vec![]; @@ -151,25 +166,35 @@ async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: Configure } } if candidates.len() != 1 { - return Err(Error::Message(format!("{} matching nodes", candidates.len()))); + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); } - let mut config = match rpc_cli.call(&rpc_host, - &Message::PullConfig, - DEFAULT_TIMEOUT).await? { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { Message::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))) + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - config.members.insert(candidates[0].clone(), - NetworkConfigEntry{ - datacenter: args.datacenter, - n_tokens: args.n_tokens, - }); + config.members.insert( + candidates[0].clone(), + NetworkConfigEntry { + datacenter: args.datacenter, + n_tokens: args.n_tokens, + }, + ); config.version += 1; - rpc_cli.call(&rpc_host, - &Message::AdvertiseConfig(config), - DEFAULT_TIMEOUT).await?; + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT, + ) + .await?; Ok(()) } diff --git a/src/membership.rs b/src/membership.rs index 3f7a84c4..b713c7a4 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,22 +1,22 @@ -use std::sync::Arc; +use std::collections::HashMap; use std::hash::Hash as StdHash; use std::hash::Hasher; +use std::io::Read; +use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; -use std::io::{Read}; -use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; -use std::net::{IpAddr, SocketAddr}; -use sha2::{Sha256, Digest}; -use tokio::prelude::*; use futures::future::join_all; +use sha2::{Digest, Sha256}; +use tokio::prelude::*; use tokio::sync::RwLock; -use crate::server::Config; -use crate::error::Error; use crate::data::*; +use crate::error::Error; use crate::proto::*; use crate::rpc_client::*; +use crate::server::Config; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); @@ -36,8 +36,8 @@ pub struct Members { pub status_hash: Hash, pub config: NetworkConfig, - pub ring: Vec, - pub n_datacenters: usize, + pub ring: Vec, + pub n_datacenters: usize, } pub struct NodeStatus { @@ -47,19 +47,21 @@ pub struct NodeStatus { #[derive(Debug)] pub struct RingEntry { - pub location: Hash, - pub node: UUID, - pub datacenter: u64, + pub location: Hash, + pub node: UUID, + pub datacenter: u64, } impl Members { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); - let old_status = self.status.insert(info.id.clone(), - NodeStatus{ + let old_status = self.status.insert( + info.id.clone(), + NodeStatus { addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - }); + }, + ); match old_status { None => { eprintln!("Newly pingable node: {}", hex::encode(&info.id)); @@ -80,122 +82,129 @@ impl Members { hasher.input(format!("{} {}\n", hex::encode(&id), status.addr)); } eprintln!("END --"); - self.status_hash.as_slice_mut().copy_from_slice(&hasher.result()[..]); + self.status_hash + .as_slice_mut() + .copy_from_slice(&hasher.result()[..]); } - fn rebuild_ring(&mut self) { - let mut new_ring = vec![]; - let mut datacenters = vec![]; + fn rebuild_ring(&mut self) { + let mut new_ring = vec![]; + let mut datacenters = vec![]; - for (id, config) in self.config.members.iter() { - let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); - config.datacenter.hash(&mut dc_hasher); - let datacenter = dc_hasher.finish(); + for (id, config) in self.config.members.iter() { + let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); + config.datacenter.hash(&mut dc_hasher); + let datacenter = dc_hasher.finish(); - if !datacenters.contains(&datacenter) { - datacenters.push(datacenter); - } + if !datacenters.contains(&datacenter) { + datacenters.push(datacenter); + } - for i in 0..config.n_tokens { - let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); + for i in 0..config.n_tokens { + let location = hash(format!("{} {}", hex::encode(&id), i).as_bytes()); - new_ring.push(RingEntry{ - location: location.into(), - node: id.clone(), - datacenter, - }) - } - } + new_ring.push(RingEntry { + location: location.into(), + node: id.clone(), + datacenter, + }) + } + } - new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); - self.ring = new_ring; - self.n_datacenters = datacenters.len(); + new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); + self.ring = new_ring; + self.n_datacenters = datacenters.len(); eprintln!("RING: --"); for e in self.ring.iter() { eprintln!("{:?}", e); } eprintln!("END --"); - } - - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::>(); - } - - let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { - Ok(i) => i, - Err(i) => if i == 0 { - self.ring.len() - 1 - } else { - i - 1 - } - }; + } + + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::>(); + } + + let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { + Ok(i) => i, + Err(i) => { + if i == 0 { + self.ring.len() - 1 + } else { + i - 1 + } + } + }; self.walk_ring_from_pos(start, n) - } + } fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec { - let mut ret = vec![]; - let mut datacenters = vec![]; + let mut ret = vec![]; + let mut datacenters = vec![]; - for delta in 0..self.ring.len() { - if ret.len() == n { - break; - } + for delta in 0..self.ring.len() { + if ret.len() == n { + break; + } - let i = (start + delta) % self.ring.len(); + let i = (start + delta) % self.ring.len(); - if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node.clone()); - } else if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node.clone()); - datacenters.push(self.ring[i].datacenter); - } - } + if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { + ret.push(self.ring[i].node.clone()); + } else if !datacenters.contains(&self.ring[i].datacenter) { + ret.push(self.ring[i].node.clone()); + datacenters.push(self.ring[i].datacenter); + } + } - ret + ret } } fn read_network_config(metadata_dir: &PathBuf) -> Result { - let mut path = metadata_dir.clone(); - path.push("network_config"); + let mut path = metadata_dir.clone(); + path.push("network_config"); let mut file = std::fs::OpenOptions::new() .read(true) .open(path.as_path())?; - + let mut net_config_bytes = vec![]; file.read_to_end(&mut net_config_bytes)?; - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; Ok(net_config) } impl System { pub fn new(config: Config, id: UUID) -> Self { - let net_config = match read_network_config(&config.metadata_dir) { - Ok(x) => x, - Err(e) => { - println!("No valid previous network configuration stored ({}), starting fresh.", e); - NetworkConfig{ + let net_config = match read_network_config(&config.metadata_dir) { + Ok(x) => x, + Err(e) => { + println!( + "No valid previous network configuration stored ({}), starting fresh.", + e + ); + NetworkConfig { members: HashMap::new(), version: 0, } - }, - }; - let mut members = Members{ - status: HashMap::new(), - status_hash: Hash::default(), - config: net_config, - ring: Vec::new(), - n_datacenters: 0, - }; + } + }; + let mut members = Members { + status: HashMap::new(), + status_hash: Hash::default(), + config: net_config, + ring: Vec::new(), + n_datacenters: 0, + }; members.recalculate_status_hash(); - members.rebuild_ring(); - System{ + members.rebuild_ring(); + System { config, id, rpc_client: RpcClient::new(), @@ -203,24 +212,26 @@ impl System { } } - async fn save_network_config(self: Arc) { - let mut path = self.config.metadata_dir.clone(); - path.push("network_config"); + async fn save_network_config(self: Arc) { + let mut path = self.config.metadata_dir.clone(); + path.push("network_config"); - let members = self.members.read().await; - let data = rmp_to_vec_all_named(&members.config) - .expect("Error while encoding network config"); - drop(members); + let members = self.members.read().await; + let data = + rmp_to_vec_all_named(&members.config).expect("Error while encoding network config"); + drop(members); - let mut f = tokio::fs::File::create(path.as_path()).await - .expect("Could not create network_config"); - f.write_all(&data[..]).await - .expect("Could not write network_config"); - } + let mut f = tokio::fs::File::create(path.as_path()) + .await + .expect("Could not create network_config"); + f.write_all(&data[..]) + .await + .expect("Could not write network_config"); + } pub async fn make_ping(&self) -> Message { let members = self.members.read().await; - Message::Ping(PingMessage{ + Message::Ping(PingMessage { id: self.id.clone(), rpc_port: self.config.rpc_port, status_hash: members.status_hash.clone(), @@ -230,13 +241,20 @@ impl System { pub async fn broadcast(self: Arc, msg: Message, timeout: Duration) { let members = self.members.read().await; - let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::>(); + let to = members + .status + .keys() + .filter(|x| **x != self.id) + .cloned() + .collect::>(); drop(members); rpc_call_many(self.clone(), &to[..], &msg, timeout).await; } pub async fn bootstrap(self: Arc) { - let bootstrap_peers = self.config.bootstrap_peers + let bootstrap_peers = self + .config + .bootstrap_peers .iter() .map(|ip| (ip.clone(), None)) .collect::>(); @@ -247,16 +265,19 @@ impl System { pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { let ping_msg = self.make_ping().await; - let ping_resps = join_all( - peers.iter() - .map(|(addr, id_option)| { - let sys = self.clone(); - let ping_msg_ref = &ping_msg; - async move { - (id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await) - } - })).await; - + let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { + let sys = self.clone(); + let ping_msg_ref = &ping_msg; + async move { + ( + id_option, + addr.clone(), + sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await, + ) + } + })) + .await; + let mut members = self.members.write().await; let mut has_changes = false; @@ -267,7 +288,7 @@ impl System { let is_new = members.handle_ping(addr.ip(), &info); if is_new { has_changes = true; - to_advertise.push(AdvertisedNode{ + to_advertise.push(AdvertisedNode { id: info.id.clone(), addr: addr.clone(), }); @@ -279,9 +300,16 @@ impl System { tokio::spawn(self.clone().pull_config(info.id.clone())); } } else if let Some(id) = id_option { - let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0); + let remaining_attempts = members + .status + .get(id) + .map(|x| x.remaining_ping_attempts) + .unwrap_or(0); if remaining_attempts == 0 { - eprintln!("Removing node {} after too many failed pings", hex::encode(&id)); + eprintln!( + "Removing node {} after too many failed pings", + hex::encode(&id) + ); members.status.remove(&id); has_changes = true; } else { @@ -297,15 +325,16 @@ impl System { drop(members); if to_advertise.len() > 0 { - self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await; + self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT) + .await; } } - pub async fn handle_ping(self: Arc, - from: &SocketAddr, - ping: &PingMessage) - -> Result - { + pub async fn handle_ping( + self: Arc, + from: &SocketAddr, + ping: &PingMessage, + ) -> Result { let mut members = self.members.write().await; let is_new = members.handle_ping(from.ip(), ping); if is_new { @@ -329,7 +358,7 @@ impl System { let members = self.members.read().await; let mut mem = vec![]; for (node, status) in members.status.iter() { - mem.push(AdvertisedNode{ + mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), }); @@ -342,10 +371,10 @@ impl System { Ok(Message::AdvertiseConfig(members.config.clone())) } - pub async fn handle_advertise_nodes_up(self: Arc, - adv: &[AdvertisedNode]) - -> Result - { + pub async fn handle_advertise_nodes_up( + self: Arc, + adv: &[AdvertisedNode], + ) -> Result { let mut to_ping = vec![]; let mut members = self.members.write().await; @@ -355,11 +384,13 @@ impl System { if node.id == self.id { // learn our own ip address let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); - let old_self = members.status.insert(node.id.clone(), - NodeStatus{ + let old_self = members.status.insert( + node.id.clone(), + NodeStatus { addr: self_addr, remaining_ping_attempts: MAX_FAILED_PINGS, - }); + }, + ); has_changed = match old_self { None => true, Some(x) => x.addr != self_addr, @@ -380,18 +411,20 @@ impl System { Ok(Message::Ok) } - pub async fn handle_advertise_config(self: Arc, - adv: &NetworkConfig) - -> Result - { + pub async fn handle_advertise_config( + self: Arc, + adv: &NetworkConfig, + ) -> Result { let mut members = self.members.write().await; if adv.version > members.config.version { - members.config = adv.clone(); - members.rebuild_ring(); + members.rebuild_ring(); - tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); - tokio::spawn(self.clone().save_network_config()); + tokio::spawn( + self.clone() + .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT), + ); + tokio::spawn(self.clone().save_network_config()); } Ok(Message::Ok) @@ -400,12 +433,14 @@ impl System { pub async fn ping_loop(self: Arc) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); - + let members = self.members.read().await; - let ping_addrs = members.status.iter() - .filter(|(id, _)| **id != self.id) - .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) - .collect::>(); + let ping_addrs = members + .status + .iter() + .filter(|(id, _)| **id != self.id) + .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) + .collect::>(); drop(members); self.clone().ping_nodes(ping_addrs).await; @@ -414,12 +449,12 @@ impl System { } } - pub fn pull_status(self: Arc, peer: UUID) -> impl futures::future::Future + Send + 'static { + pub fn pull_status( + self: Arc, + peer: UUID, + ) -> impl futures::future::Future + Send + 'static { async move { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullStatus, - PING_TIMEOUT).await; + let resp = rpc_call(self.clone(), &peer, &Message::PullStatus, PING_TIMEOUT).await; if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; } @@ -427,10 +462,7 @@ impl System { } pub async fn pull_config(self: Arc, peer: UUID) { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullConfig, - PING_TIMEOUT).await; + let resp = rpc_call(self.clone(), &peer, &Message::PullConfig, PING_TIMEOUT).await; if let Ok(Message::AdvertiseConfig(config)) = resp { let _: Result<_, _> = self.handle_advertise_config(&config).await; } diff --git a/src/object_table.rs b/src/object_table.rs index 37b9fc0a..392e0dc7 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -1,12 +1,11 @@ -use std::sync::Arc; -use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tokio::sync::RwLock; use crate::data::*; -use crate::table::*; use crate::server::Garage; - +use crate::table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -35,7 +34,7 @@ pub struct ObjectVersion { #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { DeleteMarker, - Inline(#[serde(with="serde_bytes")] Vec), + Inline(#[serde(with = "serde_bytes")] Vec), FirstBlock(Hash), } @@ -49,7 +48,9 @@ impl Entry for Object { fn merge(&mut self, other: &Self) { for other_v in other.versions.iter() { - match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) { + match self.versions.binary_search_by(|v| { + (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid)) + }) { Ok(i) => { let mut v = &mut self.versions[i]; if other_v.size > v.size { @@ -64,8 +65,11 @@ impl Entry for Object { } } } - let last_complete = self.versions - .iter().enumerate().rev() + let last_complete = self + .versions + .iter() + .enumerate() + .rev() .filter(|(_, v)| v.is_complete) .next() .map(|(vi, _)| vi); diff --git a/src/proto.rs b/src/proto.rs index df64a438..b39f49ed 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -1,6 +1,6 @@ -use std::time::Duration; +use serde::{Deserialize, Serialize}; use std::net::SocketAddr; -use serde::{Serialize, Deserialize}; +use std::time::Duration; use crate::data::*; @@ -11,7 +11,7 @@ pub enum Message { Ok, Error(String), - Ping(PingMessage), + Ping(PingMessage), PullStatus, PullConfig, AdvertiseNodesUp(Vec), @@ -42,6 +42,6 @@ pub struct AdvertisedNode { pub struct PutBlockMessage { pub hash: Hash, - #[serde(with="serde_bytes")] + #[serde(with = "serde_bytes")] pub data: Vec, } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 134f8e98..7587782e 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -3,23 +3,25 @@ use std::sync::Arc; use std::time::Duration; use bytes::IntoBuf; -use hyper::{Body, Method, Request, StatusCode}; -use hyper::client::Client; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; +use futures_util::future::FutureExt; +use hyper::client::Client; +use hyper::{Body, Method, Request, StatusCode}; use crate::data::*; use crate::error::Error; -use crate::proto::Message; use crate::membership::System; +use crate::proto::Message; -pub async fn rpc_call_many(sys: Arc, - to: &[UUID], - msg: &Message, - timeout: Duration) - -> Vec> -{ - let mut resp_stream = to.iter() +pub async fn rpc_call_many( + sys: Arc, + to: &[UUID], + msg: &Message, + timeout: Duration, +) -> Vec> { + let mut resp_stream = to + .iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::>(); @@ -30,14 +32,15 @@ pub async fn rpc_call_many(sys: Arc, results } -pub async fn rpc_try_call_many(sys: Arc, - to: &[UUID], - msg: &Message, - stop_after: usize, - timeout: Duration) - -> Result, Error> -{ - let mut resp_stream = to.iter() +pub async fn rpc_try_call_many( + sys: Arc, + to: &[UUID], + msg: &Message, + stop_after: usize, + timeout: Duration, +) -> Result, Error> { + let mut resp_stream = to + .iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::>(); @@ -49,7 +52,7 @@ pub async fn rpc_try_call_many(sys: Arc, Ok(msg) => { results.push(msg); if results.len() >= stop_after { - break + break; } } Err(e) => { @@ -69,12 +72,12 @@ pub async fn rpc_try_call_many(sys: Arc, } } -pub async fn rpc_call(sys: Arc, - to: &UUID, - msg: &Message, - timeout: Duration) - -> Result -{ +pub async fn rpc_call( + sys: Arc, + to: &UUID, + msg: &Message, + timeout: Duration, +) -> Result { let addr = { let members = sys.members.read().await; match members.status.get(to) { @@ -91,24 +94,24 @@ pub struct RpcClient { impl RpcClient { pub fn new() -> Self { - RpcClient{ + RpcClient { client: Client::new(), } } - pub async fn call(&self, - to_addr: &SocketAddr, - msg: &Message, - timeout: Duration) - -> Result - { + pub async fn call( + &self, + to_addr: &SocketAddr, + msg: &Message, + timeout: Duration, + ) -> Result { let uri = format!("http://{}/rpc", to_addr); let req = Request::builder() .method(Method::POST) .uri(uri) .body(Body::from(rmp_to_vec_all_named(msg)?))?; - let resp_fut = self.client.request(req); + let resp_fut = self.client.request(req).fuse(); let resp = tokio::time::timeout(timeout, resp_fut).await??; if resp.status() == StatusCode::OK { @@ -116,7 +119,7 @@ impl RpcClient { let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?; match msg { Message::Error(e) => Err(Error::RPCError(e)), - x => Ok(x) + x => Ok(x), } } else { Err(Error::RPCError(format!("Status code {}", resp.status()))) diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 7d8df658..9eeac5f3 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,18 +1,18 @@ use std::net::SocketAddr; use std::sync::Arc; -use serde::Serialize; use bytes::IntoBuf; -use hyper::service::{make_service_fn, service_fn}; +use futures::future::Future; use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use futures::future::Future; +use serde::Serialize; -use crate::error::Error; +use crate::block::*; use crate::data::rmp_to_vec_all_named; +use crate::error::Error; use crate::proto::Message; use crate::server::Garage; -use crate::block::*; fn debug_serialize(x: T) -> Result { let ss = serde_json::to_string(&x)?; @@ -30,7 +30,11 @@ fn err_to_msg(x: Result) -> Message { } } -async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> Result, Error> { +async fn handler( + garage: Arc, + req: Request, + addr: SocketAddr, +) -> Result, Error> { if req.method() != &Method::POST { let mut bad_request = Response::default(); *bad_request.status_mut() = StatusCode::BAD_REQUEST; @@ -40,7 +44,12 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!("RPC from {}: {} ({} bytes)", addr, debug_serialize(&msg)?, whole_body.len()); + eprintln!( + "RPC from {}: {} ({} bytes)", + addr, + debug_serialize(&msg)?, + whole_body.len() + ); let sys = garage.system.clone(); let resp = err_to_msg(match &msg { @@ -49,15 +58,13 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R Message::PullConfig => sys.handle_pull_config().await, Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, - Message::PutBlock(m) => { - write_block(garage, &m.hash, &m.data).await - } - Message::GetBlock(h) => { - read_block(garage, &h).await - } + Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await, + Message::GetBlock(h) => read_block(garage, &h).await, Message::TableRPC(table, msg) => { if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) { - rpc_handler.handle(&msg[..]).await + rpc_handler + .handle(&msg[..]) + .await .map(|rep| Message::TableRPC(table.to_string(), rep)) } else { Ok(Message::Error(format!("Unknown table: {}", table))) @@ -69,16 +76,16 @@ async fn handler(garage: Arc, req: Request, addr: SocketAddr) -> R eprintln!("reply to {}: {}", addr, debug_serialize(&resp)?); - Ok(Response::new(Body::from( - rmp_to_vec_all_named(&resp)? - ))) + Ok(Response::new(Body::from(rmp_to_vec_all_named(&resp)?))) } +pub async fn run_rpc_server( + garage: Arc, + shutdown_signal: impl Future, +) -> Result<(), hyper::Error> { + let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); -pub async fn run_rpc_server(garage: Arc, shutdown_signal: impl Future) -> Result<(), hyper::Error> { - let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); - - let service = make_service_fn(|conn: &AddrStream| { + let service = make_service_fn(|conn: &AddrStream| { let client_addr = conn.remote_addr(); let garage = garage.clone(); async move { @@ -89,10 +96,10 @@ pub async fn run_rpc_server(garage: Arc, shutdown_signal: impl Future Arc { let system = Arc::new(System::new(config, id)); - let meta_rep_param = TableReplicationParams{ + let meta_rep_param = TableReplicationParams { replication_factor: system.config.meta_replication_factor, - write_quorum: (system.config.meta_replication_factor+1)/2, - read_quorum: (system.config.meta_replication_factor+1)/2, + write_quorum: (system.config.meta_replication_factor + 1) / 2, + read_quorum: (system.config.meta_replication_factor + 1) / 2, timeout: DEFAULT_TIMEOUT, }; let object_table = Arc::new(Table::new( - ObjectTable{garage: RwLock::new(None)}, + ObjectTable { + garage: RwLock::new(None), + }, system.clone(), &db, "object".to_string(), - meta_rep_param.clone())); + meta_rep_param.clone(), + )); let version_table = Arc::new(Table::new( - VersionTable{garage: RwLock::new(None)}, + VersionTable { + garage: RwLock::new(None), + }, system.clone(), &db, "version".to_string(), - meta_rep_param.clone())); + meta_rep_param.clone(), + )); - let mut garage = Self{ + let mut garage = Self { db, system: system.clone(), fs_lock: Mutex::new(()), @@ -61,10 +67,12 @@ impl Garage { garage.table_rpc_handlers.insert( garage.object_table.name.clone(), - garage.object_table.clone().rpc_handler()); + garage.object_table.clone().rpc_handler(), + ); garage.table_rpc_handlers.insert( garage.version_table.name.clone(), - garage.version_table.clone().rpc_handler()); + garage.version_table.clone().rpc_handler(), + ); let garage = Arc::new(garage); @@ -103,7 +111,7 @@ fn read_config(config_file: PathBuf) -> Result { let mut file = std::fs::OpenOptions::new() .read(true) .open(config_file.as_path())?; - + let mut config = String::new(); file.read_to_string(&mut config)?; @@ -118,7 +126,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { let mut d = vec![]; f.read_to_end(&mut d)?; if d.len() != 32 { - return Err(Error::Message(format!("Corrupt node_id file"))) + return Err(Error::Message(format!("Corrupt node_id file"))); } let mut id = [0u8; 32]; @@ -134,10 +142,10 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { } async fn shutdown_signal(chans: Vec>) { - // Wait for the CTRL+C signal - tokio::signal::ctrl_c() - .await - .expect("failed to install CTRL+C signal handler"); + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); println!("Received CTRL+C, shutting down."); for ch in chans { ch.send(()).unwrap(); @@ -149,16 +157,13 @@ async fn wait_from(chan: oneshot::Receiver<()>) -> () { } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { - let config = read_config(config_file) - .expect("Unable to read config file"); + let config = read_config(config_file).expect("Unable to read config file"); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); - let db = sled::open(db_path) - .expect("Unable to open DB"); + let db = sled::open(db_path).expect("Unable to open DB"); - let id = gen_node_id(&config.metadata_dir) - .expect("Unable to read or generate node ID"); + let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); let garage = Garage::new(config, id, db).await; diff --git a/src/table.rs b/src/table.rs index def0d8b8..9ba9d94a 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,15 +1,14 @@ -use std::time::Duration; -use std::sync::Arc; -use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::Duration; -use crate::error::Error; -use crate::proto::*; use crate::data::*; +use crate::error::Error; use crate::membership::System; +use crate::proto::*; use crate::rpc_client::*; - pub struct Table { pub instance: F, @@ -72,7 +71,9 @@ pub trait SortKey { fn sort_key(&self) -> &[u8]; } -pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { +pub trait Entry: + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync +{ fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; @@ -114,10 +115,15 @@ pub trait TableFormat: Send + Sync { } impl Table { - pub fn new(instance: F, system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { - let store = db.open_tree(&name) - .expect("Unable to open DB tree"); - Self{ + pub fn new( + instance: F, + system: Arc, + db: &sled::Db, + name: String, + param: TableReplicationParams, + ) -> Self { + let store = db.open_tree(&name).expect("Unable to open DB tree"); + Self { instance, name, system, @@ -128,33 +134,39 @@ impl Table { } pub fn rpc_handler(self: Arc) -> Box { - Box::new(TableRpcHandlerAdapter::{ table: self }) + Box::new(TableRpcHandlerAdapter:: { table: self }) } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.system.members.read().await + let who = self + .system + .members + .read() + .await .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); let rpc = &TableRPC::::Update(vec![e.clone()]); - - self.rpc_try_call_many(&who[..], - &rpc, - self.param.write_quorum).await?; + + self.rpc_try_call_many(&who[..], &rpc, self.param.write_quorum) + .await?; Ok(()) } pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result, Error> { let hash = partition_key.hash(); - let who = self.system.members.read().await + let who = self + .system + .members + .read() + .await .walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); - let resps = self.rpc_try_call_many(&who[..], - &rpc, - self.param.read_quorum) + let resps = self + .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; let mut ret = None; @@ -180,27 +192,37 @@ impl Table { if let Some(ret_entry) = &ret { if not_all_same { // Repair on read - let _: Result<_, _> = self.rpc_try_call_many( + let _: Result<_, _> = self + .rpc_try_call_many( &who[..], &TableRPC::::Update(vec![ret_entry.clone()]), - who.len()) + who.len(), + ) .await; } } Ok(ret) } - async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC, quorum: usize) -> Result>, Error> { + async fn rpc_try_call_many( + &self, + who: &[UUID], + rpc: &TableRPC, + quorum: usize, + ) -> Result>, Error> { eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); - let resps = rpc_try_call_many(self.system.clone(), - who, - &rpc_msg, - quorum, - self.param.timeout).await?; + let resps = rpc_try_call_many( + self.system.clone(), + who, + &rpc_msg, + quorum, + self.param.timeout, + ) + .await?; let mut resps_vals = vec![]; for resp in resps { @@ -210,9 +232,15 @@ impl Table { continue; } } - return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + return Err(Error::Message(format!( + "Invalid reply to TableRPC: {:?}", + resp + ))); } - eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?); + eprintln!( + "Table RPC responses: {}", + serde_json::to_string(&resps_vals)? + ); Ok(resps_vals) } @@ -226,7 +254,7 @@ impl Table { self.handle_update(pairs).await?; Ok(TableRPC::Ok) } - _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + _ => Err(Error::RPCError(format!("Unexpected table RPC"))), } } @@ -254,7 +282,7 @@ impl Table { new_entry.merge(&update); (Some(old_entry), new_entry) } - None => (None, update.clone()) + None => (None, update.clone()), }; let new_bytes = rmp_to_vec_all_named(&new_entry) diff --git a/src/version_table.rs b/src/version_table.rs index 8c48d3af..28ee2e01 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -1,12 +1,11 @@ -use std::sync::Arc; -use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tokio::sync::RwLock; use crate::data::*; -use crate::table::*; use crate::server::Garage; - +use crate::table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { -- cgit v1.2.3