diff options
Diffstat (limited to 'src/rpc/rpc_helper.rs')
-rw-r--r-- | src/rpc/rpc_helper.rs | 73 |
1 files changed, 60 insertions, 13 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 05fdcce4..ea3e5e76 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -33,8 +33,7 @@ use crate::metrics::RpcMetrics; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC -#[derive(Copy, Clone)] -pub struct RequestStrategy { +pub struct RequestStrategy<T> { /// Min number of response to consider the request successful rs_quorum: Option<usize>, /// Send all requests at once @@ -43,6 +42,8 @@ pub struct RequestStrategy { rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, + /// Data to drop when everything completes + rs_drop_on_complete: T, } #[derive(Copy, Clone)] @@ -52,7 +53,19 @@ enum Timeout { Custom(Duration), } -impl RequestStrategy { +impl Clone for RequestStrategy<()> { + fn clone(&self) -> Self { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + } + } +} + +impl RequestStrategy<()> { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { @@ -60,8 +73,22 @@ impl RequestStrategy { rs_send_all_at_once: None, rs_priority: prio, rs_timeout: Timeout::Default, + rs_drop_on_complete: (), + } + } + /// Add an item to be dropped on completion + pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: drop_on_complete, } } +} + +impl<T> RequestStrategy<T> { /// Set quorum to be reached for request pub fn with_quorum(mut self, quorum: usize) -> Self { self.rs_quorum = Some(quorum); @@ -82,6 +109,19 @@ impl RequestStrategy { self.rs_timeout = Timeout::Custom(timeout); self } + /// Extract drop_on_complete item + fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) { + ( + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + }, + self.rs_drop_on_complete, + ) + } } #[derive(Clone)] @@ -122,7 +162,7 @@ impl RpcHelper { endpoint: &Endpoint<M, H>, to: Uuid, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<S, Error> where M: Rpc<Response = Result<S, Error>>, @@ -182,7 +222,7 @@ impl RpcHelper { endpoint: &Endpoint<M, H>, to: &[Uuid], msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, @@ -197,7 +237,7 @@ impl RpcHelper { let resps = join_all( to.iter() - .map(|to| self.call(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())), ) .with_context(Context::current_with_span(span)) .await; @@ -212,7 +252,7 @@ impl RpcHelper { &self, endpoint: &Endpoint<M, H>, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, @@ -252,7 +292,7 @@ impl RpcHelper { endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<()>, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, @@ -285,7 +325,7 @@ impl RpcHelper { endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<()>, quorum: usize, ) -> Result<Vec<S>, Error> where @@ -316,6 +356,7 @@ impl RpcHelper { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); + let strategy = strategy.clone(); async move { self2.call(&endpoint2, to, msg, strategy).await } }); @@ -388,18 +429,19 @@ impl RpcHelper { /// changes, where data has to be written both in the old layout and in the /// new one as long as all nodes have not successfully tranisitionned and /// moved all data to the new layout. - pub async fn try_write_many_sets<M, N, H, S>( + pub async fn try_write_many_sets<M, N, H, S, T>( &self, endpoint: &Arc<Endpoint<M, H>>, to_sets: &[Vec<Uuid>], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<T>, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, N: IntoReq<M>, H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, + T: Send + 'static, { let quorum = strategy .rs_quorum @@ -423,12 +465,12 @@ impl RpcHelper { .await } - async fn try_write_many_sets_inner<M, N, H, S>( + async fn try_write_many_sets_inner<M, N, H, S, T>( &self, endpoint: &Arc<Endpoint<M, H>>, to_sets: &[Vec<Uuid>], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<T>, quorum: usize, ) -> Result<Vec<S>, Error> where @@ -436,11 +478,14 @@ impl RpcHelper { N: IntoReq<M>, H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, + T: Send + 'static, { // Peers may appear in many quorum sets. Here, build a list of peers, // mapping to the index of the quorum sets in which they appear. let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); + let (strategy, drop_on_complete) = strategy.extract_drop_on_complete(); + // Send one request to each peer of the quorum sets let msg = msg.into_req().map_err(garage_net::error::Error::from)?; let requests = result_tracker.nodes.keys().map(|peer| { @@ -448,6 +493,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); let to = *peer; + let strategy = strategy.clone(); async move { (to, self2.call(&endpoint2, to, msg, strategy).await) } }); let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); @@ -463,6 +509,7 @@ impl RpcHelper { // Continue all other requets in background tokio::spawn(async move { resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await; + drop(drop_on_complete); }); return Ok(result_tracker.success_values()); |