diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api_server.rs | 4 | ||||
-rw-r--r-- | src/membership.rs | 2 | ||||
-rw-r--r-- | src/rpc_client.rs | 39 | ||||
-rw-r--r-- | src/rpc_server.rs | 2 | ||||
-rw-r--r-- | src/table.rs | 3 |
5 files changed, 31 insertions, 19 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index 441fbe1c..f3f7165b 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -219,7 +219,7 @@ async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), rpc_try_call_many( garage.system.clone(), &who[..], - &Message::PutBlock(PutBlockMessage { hash, data }), + Message::PutBlock(PutBlockMessage { hash, data }), (garage.system.config.data_replication_factor + 1) / 2, BLOCK_RW_TIMEOUT, ) @@ -366,7 +366,7 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { let resps = rpc_try_call_many( garage.system.clone(), &who[..], - &Message::GetBlock(hash.clone()), + Message::GetBlock(hash.clone()), 1, BLOCK_RW_TIMEOUT, ) diff --git a/src/membership.rs b/src/membership.rs index f511a4fd..b49607b7 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -270,7 +270,7 @@ impl System { .filter(|x| **x != self.id) .cloned() .collect::<Vec<_>>(); - rpc_call_many(self.clone(), &to[..], &msg, timeout).await; + rpc_call_many(self.clone(), &to[..], msg, timeout).await; } pub async fn bootstrap(self: Arc<Self>) { diff --git a/src/rpc_client.rs b/src/rpc_client.rs index a1c5dde0..81d20966 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::borrow::Borrow; use bytes::IntoBuf; use futures::stream::futures_unordered::FuturesUnordered; @@ -19,12 +20,13 @@ use crate::tls_util; pub async fn rpc_call_many( sys: Arc<System>, to: &[UUID], - msg: &Message, + msg: Message, timeout: Duration, ) -> Vec<Result<Message, Error>> { + let msg = Arc::new(msg); let mut resp_stream = to .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + .map(|to| rpc_call(sys.clone(), to, msg.clone(), timeout)) .collect::<FuturesUnordered<_>>(); let mut results = vec![]; @@ -37,13 +39,15 @@ pub async fn rpc_call_many( pub async fn rpc_try_call_many( sys: Arc<System>, to: &[UUID], - msg: &Message, + msg: Message, stop_after: usize, timeout: Duration, ) -> Result<Vec<Message>, Error> { - let mut resp_stream = to - .iter() - .map(|to| rpc_call(sys.clone(), to, msg, timeout)) + let sys2 = sys.clone(); + let msg = Arc::new(msg); + let mut resp_stream = to.to_vec() + .into_iter() + .map(move |to| rpc_call(sys2.clone(), to.clone(), msg.clone(), timeout)) .collect::<FuturesUnordered<_>>(); let mut results = vec![]; @@ -64,6 +68,13 @@ pub async fn rpc_try_call_many( } if results.len() >= stop_after { + // Continue requests in background + // TODO: make this optionnal (only usefull for write requests) + sys.background.spawn(async move { + resp_stream.collect::<Vec<_>>().await; + Ok(()) + }); + Ok(results) } else { let mut msg = "Too many failures:".to_string(); @@ -74,17 +85,17 @@ pub async fn rpc_try_call_many( } } -pub async fn rpc_call( +pub async fn rpc_call<M: Borrow<Message>, N: Borrow<UUID>>( sys: Arc<System>, - to: &UUID, - msg: &Message, + to: N, + msg: M, timeout: Duration, ) -> Result<Message, Error> { let addr = { let status = sys.status.borrow().clone(); - match status.nodes.get(to) { + match status.nodes.get(to.borrow()) { Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found: {:?}", to))), + None => return Err(Error::Message(format!("Peer ID not found: {:?}", to.borrow()))), } }; sys.rpc_client.call(&addr, msg, timeout).await @@ -119,10 +130,10 @@ impl RpcClient { } } - pub async fn call( + pub async fn call<M: Borrow<Message>>( &self, to_addr: &SocketAddr, - msg: &Message, + msg: M, timeout: Duration, ) -> Result<Message, Error> { let uri = match self { @@ -133,7 +144,7 @@ impl RpcClient { let req = Request::builder() .method(Method::POST) .uri(uri) - .body(Body::from(rmp_to_vec_all_named(msg)?))?; + .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; let resp_fut = match self { RpcClient::HTTP(client) => client.request(req).fuse(), diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 3527eda3..b18e366a 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -12,7 +12,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; -use crate::data::{rmp_to_vec_all_named, debug_serialize}; +use crate::data::*; use crate::error::Error; use crate::proto::Message; use crate::server::Garage; diff --git a/src/table.rs b/src/table.rs index 162f98e6..33364514 100644 --- a/src/table.rs +++ b/src/table.rs @@ -280,7 +280,7 @@ impl<F: TableSchema + 'static> Table<F> { let resps = rpc_try_call_many( self.system.clone(), who, - &rpc_msg, + rpc_msg, quorum, self.param.timeout, ) @@ -384,6 +384,7 @@ impl<F: TableSchema + 'static> Table<F> { } pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); // TODO Ok(()) } |