diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-22 15:20:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-29 12:25:02 +0200 |
commit | 8e7e680afe39f48fe15f365c9ef3fee57596e119 (patch) | |
tree | cc465dcd31776c1b8a865b22b2de923fca26efdb | |
parent | 16f6a1a65d4b973ea13cd00bbfdd7e225041e447 (diff) | |
download | garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.tar.gz garage-8e7e680afe39f48fe15f365c9ef3fee57596e119.zip |
First adaptation to WIP netapp with streaming body
-rw-r--r-- | Cargo.lock | 74 | ||||
-rw-r--r-- | src/block/manager.rs | 19 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 5 | ||||
-rw-r--r-- | src/garage/admin.rs | 4 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 6 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 6 | ||||
-rw-r--r-- | src/model/Cargo.toml | 5 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 5 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 71 | ||||
-rw-r--r-- | src/rpc/system.rs | 7 | ||||
-rw-r--r-- | src/table/schema.rs | 2 | ||||
-rw-r--r-- | src/util/Cargo.toml | 5 |
12 files changed, 97 insertions, 112 deletions
@@ -154,7 +154,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.1.0", + "bytes 1.2.0", "http", "md5", "tokio-stream", @@ -184,7 +184,7 @@ checksum = "51d371fb688d909e5b866ff1f297bbec4621eed4f9fcdac566fcc33541f0c6a6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-http", - "bytes 1.1.0", + "bytes 1.2.0", "form_urlencoded", "hex", "http", @@ -218,7 +218,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-http-tower", "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "fastrand", "http", "http-body", @@ -239,7 +239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f972226c639e0dc1eca2cb0220c1b5799e2bfc62eda37845b662c5d0cb972371" dependencies = [ "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "crc32fast", ] @@ -251,7 +251,7 @@ checksum = "12c787e24b757634453a60ff05948aa1b450f5b3a7a2094f22acff8a5022635b" dependencies = [ "aws-smithy-eventstream", "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "bytes-utils", "futures-core", "http", @@ -271,7 +271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8" dependencies = [ "aws-smithy-http", - "bytes 1.1.0", + "bytes 1.2.0", "http", "http-body", "pin-project 1.0.10", @@ -391,9 +391,9 @@ checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "bytes-utils" @@ -401,7 +401,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "either", ] @@ -982,7 +982,7 @@ dependencies = [ "async-trait", "aws-sdk-s3", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "bytesize", "chrono", "futures", @@ -1026,7 +1026,7 @@ version = "0.7.0" dependencies = [ "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "crypto-common", "err-derive 0.3.1", @@ -1071,7 +1071,7 @@ version = "0.7.0" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_db", @@ -1166,7 +1166,7 @@ checksum = "81e693aa4582cfe7a7ce70c07880e3662544b5d0cd68bc4b59c53febfbb8d1ec" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_util 0.5.1", @@ -1191,7 +1191,7 @@ version = "0.7.0" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_util 0.7.0", @@ -1224,7 +1224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3557f3757e2acd29eaee86804d4e6c38d2abda81b4b349d8a0d2277044265c" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_rpc 0.5.1", @@ -1244,7 +1244,7 @@ name = "garage_table" version = "0.7.0" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_db", @@ -1390,7 +1390,7 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "fnv", "futures-core", "futures-sink", @@ -1525,7 +1525,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "fnv", "itoa", ] @@ -1536,7 +1536,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "http", "pin-project-lite", ] @@ -1580,7 +1580,7 @@ version = "0.14.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-channel", "futures-core", "futures-util", @@ -1633,7 +1633,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "hyper", "native-tls", "tokio", @@ -1781,7 +1781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "http", "percent-encoding", @@ -1810,7 +1810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cd12d68768b54bbd50547f4c7b57b73cff680ef8da3ba409463ee995cf0d707" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "dirs-next", "either", @@ -2081,7 +2081,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "encoding_rs", "futures-util", "http", @@ -2142,12 +2142,11 @@ dependencies = [ [[package]] name = "netapp" version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#cbc21e40acfc420a3e452a1fd488c6a96694b0f2" dependencies = [ "arc-swap", "async-trait", - "bytes 0.6.0", + "bytes 1.2.0", "cfg-if 1.0.0", "err-derive 0.2.4", "futures", @@ -2157,6 +2156,7 @@ dependencies = [ "log", "opentelemetry", "opentelemetry-contrib", + "pin-project 1.0.10", "rand 0.5.6", "rmp-serde 0.14.4", "serde", @@ -2640,7 +2640,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "prost-derive", ] @@ -2650,7 +2650,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "heck 0.3.3", "itertools 0.10.3", "lazy_static", @@ -2683,7 +2683,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "prost", ] @@ -2986,7 +2986,7 @@ checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2" dependencies = [ "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "crc32fast", "futures", "http", @@ -3028,7 +3028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "digest 0.9.0", "futures", @@ -3594,7 +3594,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "libc", "memchr", "mio", @@ -3667,7 +3667,7 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-io", "futures-sink", @@ -3683,7 +3683,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-sink", "log", @@ -3709,7 +3709,7 @@ dependencies = [ "async-stream", "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-util", "h2", @@ -3770,7 +3770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81eca72647e58054bbfa41e6f297c23436f1c60aff6e5eb38455a0f9ca420bb5" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-util", "http", diff --git a/src/block/manager.rs b/src/block/manager.rs index be53ec6e..408de148 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; @@ -637,24 +636,24 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.system.rpc.call_arc( + let who_needs_resps = self + .system + .rpc + .call_many( &self.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps.into_iter() { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 2cb8ec46..5a872c7a 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -50,9 +50,8 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry-prometheus = "0.10" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 71ee608c..64a448fc 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -681,7 +681,7 @@ impl AdminRpcHandler { .endpoint .call( &node, - &AdminRpc::LaunchRepair(opt_to_send.clone()), + AdminRpc::LaunchRepair(opt_to_send.clone()), PRIO_NORMAL, ) .await; @@ -721,7 +721,7 @@ impl AdminRpcHandler { let node_id = (*node).into(); match self .endpoint - .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) + .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) .await? { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 1aa2c2ff..c8b96489 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -47,7 +47,7 @@ pub async fn cli_command_dispatch( pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, @@ -149,7 +149,7 @@ pub async fn cmd_connect( args: ConnectNodeOpt, ) -> Result<(), Error> { match rpc_cli - .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) + .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) .await?? { SystemRpc::Ok => { @@ -165,7 +165,7 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { + match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index db0af57c..3884bb92 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -36,7 +36,7 @@ pub async fn cmd_assign_role( args: AssignRoleOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, @@ -245,7 +245,7 @@ pub async fn fetch_layout( rpc_host: NodeID, ) -> Result<ClusterLayout, Error> { match rpc_cli - .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { SystemRpc::AdvertiseClusterLayout(t) => Ok(t), @@ -261,7 +261,7 @@ pub async fn send_layout( rpc_cli .call( &rpc_host, - &SystemRpc::AdvertiseClusterLayout(layout), + SystemRpc::AdvertiseClusterLayout(layout), PRIO_NORMAL, ) .await??; diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d908dc01..a97bce4d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -40,9 +40,8 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } [features] k2v = [ "garage_util/k2v" ] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 73328993..5d5151cd 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -46,9 +46,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +#netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 34717d3b..079cdc70 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,9 +15,9 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; +pub use netapp::endpoint::{Endpoint, EndpointHandler}; +pub use netapp::message::{Message as Rpc, *}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; @@ -30,10 +30,8 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); -// Try to never have more than 200MB of outgoing requests -// buffered at the same time. Other requests are queued until -// space is freed. -const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024; +// Don't allow more than 100 concurrent outgoing RPCs. +const MAX_CONCURRENT_REQUESTS: usize = 100; /// Strategy to apply when making RPC #[derive(Copy, Clone)] @@ -95,7 +93,7 @@ impl RpcHelper { background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, ) -> Self { - let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); let metrics = RpcMetrics::new(sem.clone()); @@ -109,29 +107,16 @@ impl RpcHelper { })) } - pub async fn call<M, H, S>( + pub async fn call<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, to: Uuid, - msg: M, - strat: RequestStrategy, - ) -> Result<S, Error> - where - M: Rpc<Response = Result<S, Error>>, - H: EndpointHandler<M>, - { - self.call_arc(endpoint, to, Arc::new(msg), strat).await - } - - pub async fn call_arc<M, H, S>( - &self, - endpoint: &Endpoint<M, H>, - to: Uuid, - msg: Arc<M>, + msg: N, strat: RequestStrategy, ) -> Result<S, Error> where M: Rpc<Response = Result<S, Error>>, + N: IntoReq<M> + Send, H: EndpointHandler<M>, { let metric_tags = [ @@ -140,11 +125,10 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 .request_buffer_semaphore - .acquire_many(msg_size) + .acquire() .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .await?; @@ -152,7 +136,7 @@ impl RpcHelper { let node_id = to.into(); let rpc_call = endpoint - .call(&node_id, msg, strat.rs_priority) + .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { @@ -162,7 +146,7 @@ impl RpcHelper { if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } - let res = res?; + let res = res?.into_msg(); if res.is_err() { self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); @@ -178,37 +162,41 @@ impl RpcHelper { } } - pub async fn call_many<M, H, S>( + pub async fn call_many<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, to: &[Uuid], - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result<S, Error>)> + ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, + N: IntoReq<M>, H: EndpointHandler<M>, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let resps = join_all( to.iter() - .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat)), ) .await; - to.iter() + Ok(to + .iter() .cloned() .zip(resps.into_iter()) - .collect::<Vec<_>>() + .collect::<Vec<_>>()) } - pub async fn broadcast<M, H, S>( + pub async fn broadcast<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result<S, Error>)> + ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, + N: IntoReq<M>, H: EndpointHandler<M>, { let to = self @@ -262,20 +250,21 @@ impl RpcHelper { .await } - async fn try_call_many_internal<M, H, S>( + async fn try_call_many_internal<M, N, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, quorum: usize, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, + N: IntoReq<M>, H: EndpointHandler<M> + 'static, S: Send + 'static, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -285,7 +274,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); (to, async move { - self2.call_arc(&endpoint2, to, msg, strategy).await + self2.call(&endpoint2, to, msg, strategy).await }) }); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f9f2970b..04ef2f69 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -16,8 +16,8 @@ use tokio::sync::watch; use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; +use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::proto::*; use netapp::util::parse_and_resolve_peer_addr; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -544,7 +544,7 @@ impl System { SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await; + .await?; Ok(()) }); self.background.spawn(self.clone().save_cluster_layout()); @@ -559,7 +559,8 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); - self.rpc + let _ = self + .rpc .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), diff --git a/src/table/schema.rs b/src/table/schema.rs index 74f57798..f37e98d8 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -60,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: } /// Trait for the schema used in a table -pub trait TableSchema: Send + Sync { +pub trait TableSchema: Send + Sync + 'static { /// The name of the table in the database const TABLE_NAME: &'static str; diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 7d79f21a..89064592 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -36,9 +36,8 @@ toml = "0.5" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } http = "0.2" hyper = "0.14" |