aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 15:20:00 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:02 +0200
commit8e7e680afe39f48fe15f365c9ef3fee57596e119 (patch)
treecc465dcd31776c1b8a865b22b2de923fca26efdb /src
parent16f6a1a65d4b973ea13cd00bbfdd7e225041e447 (diff)
downloadgarage-8e7e680afe39f48fe15f365c9ef3fee57596e119.tar.gz
garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.zip
First adaptation to WIP netapp with streaming body
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs19
-rw-r--r--src/garage/Cargo.toml5
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/cmd.rs6
-rw-r--r--src/garage/cli/layout.rs6
-rw-r--r--src/model/Cargo.toml5
-rw-r--r--src/rpc/Cargo.toml5
-rw-r--r--src/rpc/rpc_helper.rs71
-rw-r--r--src/rpc/system.rs7
-rw-r--r--src/table/schema.rs2
-rw-r--r--src/util/Cargo.toml5
11 files changed, 60 insertions, 75 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index be53ec6e..408de148 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -8,7 +8,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
-use futures::future::*;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select;
@@ -637,24 +636,24 @@ impl BlockManager {
}
who.retain(|id| *id != self.system.id);
- let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- self.system.rpc.call_arc(
+ let who_needs_resps = self
+ .system
+ .rpc
+ .call_many(
&self.endpoint,
- *to,
- msg.clone(),
+ &who,
+ BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
)
- });
- let who_needs_resps = join_all(who_needs_fut).await;
+ .await?;
let mut need_nodes = vec![];
- for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ for (node, needed) in who_needs_resps.into_iter() {
match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => {
if needed {
- need_nodes.push(*node);
+ need_nodes.push(node);
}
}
m => {
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 2cb8ec46..5a872c7a 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -50,9 +50,8 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 71ee608c..64a448fc 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -681,7 +681,7 @@ impl AdminRpcHandler {
.endpoint
.call(
&node,
- &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
@@ -721,7 +721,7 @@ impl AdminRpcHandler {
let node_id = (*node).into();
match self
.endpoint
- .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL)
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await?
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 1aa2c2ff..c8b96489 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -47,7 +47,7 @@ pub async fn cli_command_dispatch(
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -149,7 +149,7 @@ pub async fn cmd_connect(
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
@@ -165,7 +165,7 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
+ match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index db0af57c..3884bb92 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -36,7 +36,7 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -245,7 +245,7 @@ pub async fn fetch_layout(
rpc_host: NodeID,
) -> Result<ClusterLayout, Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@@ -261,7 +261,7 @@ pub async fn send_layout(
rpc_cli
.call(
&rpc_host,
- &SystemRpc::AdvertiseClusterLayout(layout),
+ SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index d908dc01..a97bce4d 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -40,9 +40,8 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
[features]
k2v = [ "garage_util/k2v" ]
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 73328993..5d5151cd 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -46,9 +46,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
-netapp = { version = "0.4.4", features = ["telemetry"] }
+#netapp = { version = "0.4.4", features = ["telemetry"] }
+netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 34717d3b..079cdc70 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,9 +15,9 @@ use opentelemetry::{
Context,
};
-pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
+pub use netapp::endpoint::{Endpoint, EndpointHandler};
+pub use netapp::message::{Message as Rpc, *};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner;
@@ -30,10 +30,8 @@ use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
-// Try to never have more than 200MB of outgoing requests
-// buffered at the same time. Other requests are queued until
-// space is freed.
-const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024;
+// Don't allow more than 100 concurrent outgoing RPCs.
+const MAX_CONCURRENT_REQUESTS: usize = 100;
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
@@ -95,7 +93,7 @@ impl RpcHelper {
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
) -> Self {
- let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
+ let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
let metrics = RpcMetrics::new(sem.clone());
@@ -109,29 +107,16 @@ impl RpcHelper {
}))
}
- pub async fn call<M, H, S>(
+ pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
to: Uuid,
- msg: M,
- strat: RequestStrategy,
- ) -> Result<S, Error>
- where
- M: Rpc<Response = Result<S, Error>>,
- H: EndpointHandler<M>,
- {
- self.call_arc(endpoint, to, Arc::new(msg), strat).await
- }
-
- pub async fn call_arc<M, H, S>(
- &self,
- endpoint: &Endpoint<M, H>,
- to: Uuid,
- msg: Arc<M>,
+ msg: N,
strat: RequestStrategy,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
+ N: IntoReq<M> + Send,
H: EndpointHandler<M>,
{
let metric_tags = [
@@ -140,11 +125,10 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
.0
.request_buffer_semaphore
- .acquire_many(msg_size)
+ .acquire()
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?;
@@ -152,7 +136,7 @@ impl RpcHelper {
let node_id = to.into();
let rpc_call = endpoint
- .call(&node_id, msg, strat.rs_priority)
+ .call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! {
@@ -162,7 +146,7 @@ impl RpcHelper {
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
- let res = res?;
+ let res = res?.into_msg();
if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
@@ -178,37 +162,41 @@ impl RpcHelper {
}
}
- pub async fn call_many<M, H, S>(
+ pub async fn call_many<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
to: &[Uuid],
- msg: M,
+ msg: N,
strat: RequestStrategy,
- ) -> Vec<(Uuid, Result<S, Error>)>
+ ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
+ N: IntoReq<M>,
H: EndpointHandler<M>,
{
- let msg = Arc::new(msg);
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+
let resps = join_all(
to.iter()
- .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
)
.await;
- to.iter()
+ Ok(to
+ .iter()
.cloned()
.zip(resps.into_iter())
- .collect::<Vec<_>>()
+ .collect::<Vec<_>>())
}
- pub async fn broadcast<M, H, S>(
+ pub async fn broadcast<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- msg: M,
+ msg: N,
strat: RequestStrategy,
- ) -> Vec<(Uuid, Result<S, Error>)>
+ ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
+ N: IntoReq<M>,
H: EndpointHandler<M>,
{
let to = self
@@ -262,20 +250,21 @@ impl RpcHelper {
.await
}
- async fn try_call_many_internal<M, H, S>(
+ async fn try_call_many_internal<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
- msg: M,
+ msg: N,
strategy: RequestStrategy,
quorum: usize,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
+ N: IntoReq<M>,
H: EndpointHandler<M> + 'static,
S: Send + 'static,
{
- let msg = Arc::new(msg);
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
@@ -285,7 +274,7 @@ impl RpcHelper {
let msg = msg.clone();
let endpoint2 = endpoint.clone();
(to, async move {
- self2.call_arc(&endpoint2, to, msg, strategy).await
+ self2.call(&endpoint2, to, msg, strategy).await
})
});
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index f9f2970b..04ef2f69 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -16,8 +16,8 @@ use tokio::sync::watch;
use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
+use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::proto::*;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
@@ -544,7 +544,7 @@ impl System {
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await;
+ .await?;
Ok(())
});
self.background.spawn(self.clone().save_cluster_layout());
@@ -559,7 +559,8 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
- self.rpc
+ let _ = self
+ .rpc
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 74f57798..f37e98d8 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -60,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
}
/// Trait for the schema used in a table
-pub trait TableSchema: Send + Sync {
+pub trait TableSchema: Send + Sync + 'static {
/// The name of the table in the database
const TABLE_NAME: &'static str;
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 7d79f21a..89064592 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -36,9 +36,8 @@ toml = "0.5"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
http = "0.2"
hyper = "0.14"