aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/rpc_helper.rs73
1 files changed, 60 insertions, 13 deletions
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 05fdcce4..ea3e5e76 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -33,8 +33,7 @@ use crate::metrics::RpcMetrics;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
-#[derive(Copy, Clone)]
-pub struct RequestStrategy {
+pub struct RequestStrategy<T> {
/// Min number of response to consider the request successful
rs_quorum: Option<usize>,
/// Send all requests at once
@@ -43,6 +42,8 @@ pub struct RequestStrategy {
rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
+ /// Data to drop when everything completes
+ rs_drop_on_complete: T,
}
#[derive(Copy, Clone)]
@@ -52,7 +53,19 @@ enum Timeout {
Custom(Duration),
}
-impl RequestStrategy {
+impl Clone for RequestStrategy<()> {
+ fn clone(&self) -> Self {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ }
+ }
+}
+
+impl RequestStrategy<()> {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
@@ -60,8 +73,22 @@ impl RequestStrategy {
rs_send_all_at_once: None,
rs_priority: prio,
rs_timeout: Timeout::Default,
+ rs_drop_on_complete: (),
+ }
+ }
+ /// Add an item to be dropped on completion
+ pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: drop_on_complete,
}
}
+}
+
+impl<T> RequestStrategy<T> {
/// Set quorum to be reached for request
pub fn with_quorum(mut self, quorum: usize) -> Self {
self.rs_quorum = Some(quorum);
@@ -82,6 +109,19 @@ impl RequestStrategy {
self.rs_timeout = Timeout::Custom(timeout);
self
}
+ /// Extract drop_on_complete item
+ fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
+ (
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ },
+ self.rs_drop_on_complete,
+ )
+ }
}
#[derive(Clone)]
@@ -122,7 +162,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: Uuid,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -182,7 +222,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: &[Uuid],
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -197,7 +237,7 @@ impl RpcHelper {
let resps = join_all(
to.iter()
- .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
)
.with_context(Context::current_with_span(span))
.await;
@@ -212,7 +252,7 @@ impl RpcHelper {
&self,
endpoint: &Endpoint<M, H>,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -252,7 +292,7 @@ impl RpcHelper {
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<()>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
@@ -285,7 +325,7 @@ impl RpcHelper {
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<()>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -316,6 +356,7 @@ impl RpcHelper {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
+ let strategy = strategy.clone();
async move { self2.call(&endpoint2, to, msg, strategy).await }
});
@@ -388,18 +429,19 @@ impl RpcHelper {
/// changes, where data has to be written both in the old layout and in the
/// new one as long as all nodes have not successfully tranisitionned and
/// moved all data to the new layout.
- pub async fn try_write_many_sets<M, N, H, S>(
+ pub async fn try_write_many_sets<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to_sets: &[Vec<Uuid>],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let quorum = strategy
.rs_quorum
@@ -423,12 +465,12 @@ impl RpcHelper {
.await
}
- async fn try_write_many_sets_inner<M, N, H, S>(
+ async fn try_write_many_sets_inner<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to_sets: &[Vec<Uuid>],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -436,11 +478,14 @@ impl RpcHelper {
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
// Peers may appear in many quorum sets. Here, build a list of peers,
// mapping to the index of the quorum sets in which they appear.
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
+ let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
+
// Send one request to each peer of the quorum sets
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let requests = result_tracker.nodes.keys().map(|peer| {
@@ -448,6 +493,7 @@ impl RpcHelper {
let msg = msg.clone();
let endpoint2 = endpoint.clone();
let to = *peer;
+ let strategy = strategy.clone();
async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }
});
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
@@ -463,6 +509,7 @@ impl RpcHelper {
// Continue all other requets in background
tokio::spawn(async move {
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
+ drop(drop_on_complete);
});
return Ok(result_tracker.success_values());