From 8e7e680afe39f48fe15f365c9ef3fee57596e119 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 15:20:00 +0200 Subject: First adaptation to WIP netapp with streaming body --- Cargo.lock | 74 ++++++++++++++++++++++++------------------------ src/block/manager.rs | 19 ++++++------- src/garage/Cargo.toml | 5 ++-- src/garage/admin.rs | 4 +-- src/garage/cli/cmd.rs | 6 ++-- src/garage/cli/layout.rs | 6 ++-- src/model/Cargo.toml | 5 ++-- src/rpc/Cargo.toml | 5 ++-- src/rpc/rpc_helper.rs | 71 ++++++++++++++++++++-------------------------- src/rpc/system.rs | 7 +++-- src/table/schema.rs | 2 +- src/util/Cargo.toml | 5 ++-- 12 files changed, 97 insertions(+), 112 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d54cabd0..8edffc6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,7 +154,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.1.0", + "bytes 1.2.0", "http", "md5", "tokio-stream", @@ -184,7 +184,7 @@ checksum = "51d371fb688d909e5b866ff1f297bbec4621eed4f9fcdac566fcc33541f0c6a6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-http", - "bytes 1.1.0", + "bytes 1.2.0", "form_urlencoded", "hex", "http", @@ -218,7 +218,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-http-tower", "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "fastrand", "http", "http-body", @@ -239,7 +239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f972226c639e0dc1eca2cb0220c1b5799e2bfc62eda37845b662c5d0cb972371" dependencies = [ "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "crc32fast", ] @@ -251,7 +251,7 @@ checksum = "12c787e24b757634453a60ff05948aa1b450f5b3a7a2094f22acff8a5022635b" dependencies = [ "aws-smithy-eventstream", "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.0", "bytes-utils", "futures-core", "http", @@ -271,7 +271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8" dependencies = [ "aws-smithy-http", - "bytes 1.1.0", + "bytes 1.2.0", "http", "http-body", "pin-project 1.0.10", @@ -391,9 +391,9 @@ checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "bytes-utils" @@ -401,7 +401,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "either", ] @@ -982,7 +982,7 @@ dependencies = [ "async-trait", "aws-sdk-s3", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "bytesize", "chrono", "futures", @@ -1026,7 +1026,7 @@ version = "0.7.0" dependencies = [ "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "crypto-common", "err-derive 0.3.1", @@ -1071,7 +1071,7 @@ version = "0.7.0" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_db", @@ -1166,7 +1166,7 @@ checksum = "81e693aa4582cfe7a7ce70c07880e3662544b5d0cd68bc4b59c53febfbb8d1ec" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_util 0.5.1", @@ -1191,7 +1191,7 @@ version = "0.7.0" dependencies = [ "arc-swap", "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_util 0.7.0", @@ -1224,7 +1224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3557f3757e2acd29eaee86804d4e6c38d2abda81b4b349d8a0d2277044265c" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_rpc 0.5.1", @@ -1244,7 +1244,7 @@ name = "garage_table" version = "0.7.0" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.0", "futures", "futures-util", "garage_db", @@ -1390,7 +1390,7 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "fnv", "futures-core", "futures-sink", @@ -1525,7 +1525,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "fnv", "itoa", ] @@ -1536,7 +1536,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "http", "pin-project-lite", ] @@ -1580,7 +1580,7 @@ version = "0.14.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-channel", "futures-core", "futures-util", @@ -1633,7 +1633,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "hyper", "native-tls", "tokio", @@ -1781,7 +1781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "http", "percent-encoding", @@ -1810,7 +1810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cd12d68768b54bbd50547f4c7b57b73cff680ef8da3ba409463ee995cf0d707" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "dirs-next", "either", @@ -2081,7 +2081,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "encoding_rs", "futures-util", "http", @@ -2142,12 +2142,11 @@ dependencies = [ [[package]] name = "netapp" version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#cbc21e40acfc420a3e452a1fd488c6a96694b0f2" dependencies = [ "arc-swap", "async-trait", - "bytes 0.6.0", + "bytes 1.2.0", "cfg-if 1.0.0", "err-derive 0.2.4", "futures", @@ -2157,6 +2156,7 @@ dependencies = [ "log", "opentelemetry", "opentelemetry-contrib", + "pin-project 1.0.10", "rand 0.5.6", "rmp-serde 0.14.4", "serde", @@ -2640,7 +2640,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "prost-derive", ] @@ -2650,7 +2650,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "heck 0.3.3", "itertools 0.10.3", "lazy_static", @@ -2683,7 +2683,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "prost", ] @@ -2986,7 +2986,7 @@ checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2" dependencies = [ "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "crc32fast", "futures", "http", @@ -3028,7 +3028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "digest 0.9.0", "futures", @@ -3594,7 +3594,7 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "libc", "memchr", "mio", @@ -3667,7 +3667,7 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-io", "futures-sink", @@ -3683,7 +3683,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-sink", "log", @@ -3709,7 +3709,7 @@ dependencies = [ "async-stream", "async-trait", "base64", - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-util", "h2", @@ -3770,7 +3770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81eca72647e58054bbfa41e6f297c23436f1c60aff6e5eb38455a0f9ca420bb5" dependencies = [ "base64", - "bytes 1.1.0", + "bytes 1.2.0", "futures-core", "futures-util", "http", diff --git a/src/block/manager.rs b/src/block/manager.rs index be53ec6e..408de148 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; @@ -637,24 +636,24 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.system.rpc.call_arc( + let who_needs_resps = self + .system + .rpc + .call_many( &self.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps.into_iter() { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 2cb8ec46..5a872c7a 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -50,9 +50,8 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry-prometheus = "0.10" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 71ee608c..64a448fc 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -681,7 +681,7 @@ impl AdminRpcHandler { .endpoint .call( &node, - &AdminRpc::LaunchRepair(opt_to_send.clone()), + AdminRpc::LaunchRepair(opt_to_send.clone()), PRIO_NORMAL, ) .await; @@ -721,7 +721,7 @@ impl AdminRpcHandler { let node_id = (*node).into(); match self .endpoint - .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) + .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) .await? { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 1aa2c2ff..c8b96489 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -47,7 +47,7 @@ pub async fn cli_command_dispatch( pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, @@ -149,7 +149,7 @@ pub async fn cmd_connect( args: ConnectNodeOpt, ) -> Result<(), Error> { match rpc_cli - .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) + .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL) .await?? { SystemRpc::Ok => { @@ -165,7 +165,7 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { + match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index db0af57c..3884bb92 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -36,7 +36,7 @@ pub async fn cmd_assign_role( args: AssignRoleOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, @@ -245,7 +245,7 @@ pub async fn fetch_layout( rpc_host: NodeID, ) -> Result { match rpc_cli - .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) + .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { SystemRpc::AdvertiseClusterLayout(t) => Ok(t), @@ -261,7 +261,7 @@ pub async fn send_layout( rpc_cli .call( &rpc_host, - &SystemRpc::AdvertiseClusterLayout(layout), + SystemRpc::AdvertiseClusterLayout(layout), PRIO_NORMAL, ) .await??; diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d908dc01..a97bce4d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -40,9 +40,8 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } [features] k2v = [ "garage_util/k2v" ] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 73328993..5d5151cd 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -46,9 +46,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +#netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 34717d3b..079cdc70 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,9 +15,9 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; +pub use netapp::endpoint::{Endpoint, EndpointHandler}; +pub use netapp::message::{Message as Rpc, *}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; @@ -30,10 +30,8 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); -// Try to never have more than 200MB of outgoing requests -// buffered at the same time. Other requests are queued until -// space is freed. -const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024; +// Don't allow more than 100 concurrent outgoing RPCs. +const MAX_CONCURRENT_REQUESTS: usize = 100; /// Strategy to apply when making RPC #[derive(Copy, Clone)] @@ -95,7 +93,7 @@ impl RpcHelper { background: Arc, ring: watch::Receiver>, ) -> Self { - let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); let metrics = RpcMetrics::new(sem.clone()); @@ -109,29 +107,16 @@ impl RpcHelper { })) } - pub async fn call( + pub async fn call( &self, endpoint: &Endpoint, to: Uuid, - msg: M, - strat: RequestStrategy, - ) -> Result - where - M: Rpc>, - H: EndpointHandler, - { - self.call_arc(endpoint, to, Arc::new(msg), strat).await - } - - pub async fn call_arc( - &self, - endpoint: &Endpoint, - to: Uuid, - msg: Arc, + msg: N, strat: RequestStrategy, ) -> Result where M: Rpc>, + N: IntoReq + Send, H: EndpointHandler, { let metric_tags = [ @@ -140,11 +125,10 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 .request_buffer_semaphore - .acquire_many(msg_size) + .acquire() .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .await?; @@ -152,7 +136,7 @@ impl RpcHelper { let node_id = to.into(); let rpc_call = endpoint - .call(&node_id, msg, strat.rs_priority) + .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { @@ -162,7 +146,7 @@ impl RpcHelper { if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } - let res = res?; + let res = res?.into_msg(); if res.is_err() { self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); @@ -178,37 +162,41 @@ impl RpcHelper { } } - pub async fn call_many( + pub async fn call_many( &self, endpoint: &Endpoint, to: &[Uuid], - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result)> + ) -> Result)>, Error> where M: Rpc>, + N: IntoReq, H: EndpointHandler, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let resps = join_all( to.iter() - .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat)), ) .await; - to.iter() + Ok(to + .iter() .cloned() .zip(resps.into_iter()) - .collect::>() + .collect::>()) } - pub async fn broadcast( + pub async fn broadcast( &self, endpoint: &Endpoint, - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result)> + ) -> Result)>, Error> where M: Rpc>, + N: IntoReq, H: EndpointHandler, { let to = self @@ -262,20 +250,21 @@ impl RpcHelper { .await } - async fn try_call_many_internal( + async fn try_call_many_internal( &self, endpoint: &Arc>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, quorum: usize, ) -> Result, Error> where M: Rpc> + 'static, + N: IntoReq, H: EndpointHandler + 'static, S: Send + 'static, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -285,7 +274,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); (to, async move { - self2.call_arc(&endpoint2, to, msg, strategy).await + self2.call(&endpoint2, to, msg, strategy).await }) }); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f9f2970b..04ef2f69 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -16,8 +16,8 @@ use tokio::sync::watch; use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; +use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::proto::*; use netapp::util::parse_and_resolve_peer_addr; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -544,7 +544,7 @@ impl System { SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await; + .await?; Ok(()) }); self.background.spawn(self.clone().save_cluster_layout()); @@ -559,7 +559,8 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); - self.rpc + let _ = self + .rpc .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), diff --git a/src/table/schema.rs b/src/table/schema.rs index 74f57798..f37e98d8 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -60,7 +60,7 @@ pub trait Entry: } /// Trait for the schema used in a table -pub trait TableSchema: Send + Sync { +pub trait TableSchema: Send + Sync + 'static { /// The name of the table in the database const TABLE_NAME: &'static str; diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 7d79f21a..89064592 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -36,9 +36,8 @@ toml = "0.5" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +#netapp = "0.4" +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } http = "0.2" hyper = "0.14" -- cgit v1.2.3 From a35d4da721db3550a2833d8576d4283bc999e8df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 16:45:45 +0200 Subject: update netapp to 0.5 --- Cargo.lock | 12 ++++++------ src/garage/Cargo.toml | 2 +- src/model/Cargo.toml | 2 +- src/rpc/Cargo.toml | 2 +- src/util/Cargo.toml | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8edffc6a..ef72f911 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ "http", "hyper", "kuska-sodiumoxide", - "netapp 0.4.4", + "netapp 0.5.0", "opentelemetry", "opentelemetry-otlp", "opentelemetry-prometheus", @@ -1147,7 +1147,7 @@ dependencies = [ "garage_table 0.7.0", "garage_util 0.7.0", "hex", - "netapp 0.4.4", + "netapp 0.5.0", "opentelemetry", "rand 0.8.5", "rmp-serde 0.15.5", @@ -1202,7 +1202,7 @@ dependencies = [ "k8s-openapi", "kube", "kuska-sodiumoxide", - "netapp 0.4.4", + "netapp 0.5.0", "openssl", "opentelemetry", "pnet_datalink", @@ -1301,7 +1301,7 @@ dependencies = [ "hex", "http", "hyper", - "netapp 0.4.4", + "netapp 0.5.0", "opentelemetry", "rand 0.8.5", "rmp-serde 0.15.5", @@ -2141,8 +2141,8 @@ dependencies = [ [[package]] name = "netapp" -version = "0.4.4" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#cbc21e40acfc420a3e452a1fd488c6a96694b0f2" +version = "0.5.0" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#ab80ade4f0034cbdcf15a99c674730f85eb06870" dependencies = [ "arc-swap", "async-trait", diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 5a872c7a..1e96f1f3 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -51,7 +51,7 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } #netapp = "0.4" -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry-prometheus = "0.10" diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index a97bce4d..73011e0d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -41,7 +41,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi opentelemetry = "0.17" #netapp = "0.4" -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } [features] k2v = [ "garage_util/k2v" ] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 5d5151cd..5986b7bf 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -47,7 +47,7 @@ tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" #netapp = { version = "0.4.4", features = ["telemetry"] } -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 89064592..a70f68b9 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -37,7 +37,7 @@ futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } #netapp = "0.4" -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } http = "0.2" hyper = "0.14" -- cgit v1.2.3 From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- Cargo.lock | 18 +++- src/api/s3/copy.rs | 12 ++- src/api/s3/get.rs | 29 ++++-- src/block/Cargo.toml | 3 + src/block/block.rs | 37 ++++++-- src/block/manager.rs | 249 +++++++++++++++++++++++++++++++++++++++----------- src/rpc/rpc_helper.rs | 24 +++-- 7 files changed, 284 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef72f911..9d12f523 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,6 +50,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-compression" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00461f243d703f6999c8e7494f077799f1362720a55ae49a90ffe6214032fc0b" +dependencies = [ + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd", + "zstd-safe", +] + [[package]] name = "async-stream" version = "0.3.3" @@ -1070,6 +1084,7 @@ name = "garage_block" version = "0.7.0" dependencies = [ "arc-swap", + "async-compression", "async-trait", "bytes 1.2.0", "futures", @@ -1085,6 +1100,7 @@ dependencies = [ "serde", "serde_bytes", "tokio", + "tokio-util 0.6.9", "tracing", "zstd", ] @@ -1292,7 +1308,7 @@ version = "0.7.0" dependencies = [ "async-trait", "blake2", - "bytes 1.1.0", + "bytes 1.2.0", "chrono", "digest 0.10.3", "err-derive 0.3.1", diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 4415a037..54a565e0 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -5,6 +5,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; +use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; @@ -311,7 +312,7 @@ pub async fn handle_upload_part_copy( stream::once(async move { let data = garage3.block_manager.rpc_get_block(&block_hash).await?; match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), + Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) @@ -556,7 +557,7 @@ impl CopyPreconditionHeaders { } } -type BlockStreamItemOk = (Vec, Option); +type BlockStreamItemOk = (Bytes, Option); type BlockStreamItem = Result; struct Defragmenter> { @@ -589,7 +590,7 @@ impl> Defragmenter { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; + self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -600,7 +601,10 @@ impl> Defragmenter { } } - Ok((std::mem::take(&mut self.buffer), self.hash.take())) + Ok(( + Bytes::from(std::mem::take(&mut self.buffer)), + self.hash.take(), + )) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7fa1a177..7d118f89 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -242,10 +242,13 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let read_first_block = garage.block_manager.rpc_get_block(first_block_hash); + let read_first_block = garage + .block_manager + .rpc_get_block_streaming(first_block_hash); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); - let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; + let (first_block_stream, version) = + futures::try_join!(read_first_block, get_next_blocks)?; let version = version.ok_or(Error::NoSuchKey)?; let mut blocks = version @@ -254,24 +257,32 @@ pub async fn handle_get( .iter() .map(|(_, vb)| (vb.hash, None)) .collect::>(); - blocks[0].1 = Some(first_block); + blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, data_opt)| { + .map(move |(hash, stream_opt)| { let garage = garage.clone(); async move { - if let Some(data) = data_opt { - Ok(Bytes::from(data)) + if let Some(stream) = stream_opt { + stream } else { garage .block_manager - .rpc_get_block(&hash) + .rpc_get_block_streaming(&hash) .await - .map(Bytes::from) + .unwrap_or_else(|_| { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Could not get next block", + )) + })) + }) } } }) - .buffered(2); + .buffered(3) + .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); Ok(resp_builder.body(body)?) diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 2555a44a..3e6f7bc0 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -27,6 +27,8 @@ bytes = "1.0" hex = "0.4" tracing = "0.1.30" rand = "0.8" + +async-compression = { version = "0.3", features = ["tokio", "zstd"] } zstd = { version = "0.9", default-features = false } rmp-serde = "0.15" @@ -36,3 +38,4 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-util = { version = "0.6", features = ["io"] } diff --git a/src/block/block.rs b/src/block/block.rs index f17bd2c0..935aa900 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +pub enum DataBlockHeader { + Plain, + Compressed, +} + /// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] pub enum DataBlock { /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), + Plain(Bytes), /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), + Compressed(Bytes), } impl DataBlock { @@ -31,7 +36,7 @@ impl DataBlock { /// Get the buffer, possibly decompressing it, and verify it's integrity. /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result, Error> { + pub fn verify_get(self, hash: Hash) -> Result { match self { DataBlock::Plain(data) => { if blake2sum(&data) == hash { @@ -40,9 +45,9 @@ impl DataBlock { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } + DataBlock::Compressed(data) => zstd_decode(&data[..]) + .map_err(|_| Error::CorruptData(hash)) + .map(Bytes::from), } } @@ -66,14 +71,28 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + return DataBlock::Compressed(data.into()); } } - DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + DataBlock::Plain(data) }) .await .unwrap() } + + pub fn into_parts(self) -> (DataBlockHeader, Bytes) { + match self { + DataBlock::Plain(data) => (DataBlockHeader::Plain, data), + DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), + } + } + + pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { + match h { + DataBlockHeader::Plain => DataBlock::Plain(bytes), + DataBlockHeader::Compressed => DataBlock::Compressed(bytes), + } + } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { diff --git a/src/block/manager.rs b/src/block/manager.rs index 408de148..bb01c300 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -8,8 +9,10 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use futures::{Stream, TryStreamExt}; +use futures_util::stream::StreamExt; use tokio::fs; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::select; use tokio::sync::{mpsc, watch, Mutex, Notify}; @@ -18,6 +21,8 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; + use garage_db as db; use garage_db::counted_tree_hack::CountedTree; @@ -70,7 +75,7 @@ pub enum BlockRpc { /// block PutBlock { hash: Hash, - data: DataBlock, + header: DataBlockHeader, }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), @@ -174,56 +179,146 @@ impl BlockManager { } /// Ask nodes that might have a (possibly compressed) block for it + /// Return it as a stream with a header + async fn rpc_get_raw_block_streaming( + &self, + hash: &Hash, + ) -> Result<(DataBlockHeader, ByteStream), Error> { + let who = self.replication.read_nodes(hash); + + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + return Ok((header, stream)); + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; + } + + Err(Error::Message(format!( + "Unable to read block {:?}: no node returned a valid block", + hash + ))) + } + + /// Ask nodes that might have a (possibly compressed) block for it + /// Return its entire body async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); - let resps = self - .system - .rpc - .try_call_many( - &self.endpoint, - &who[..], - BlockRpc::GetBlock(*hash), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .with_timeout(BLOCK_RW_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - for resp in resps { - if let BlockRpc::PutBlock { data, .. } = resp { - return Ok(data); - } + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + match read_stream_to_end(stream).await { + Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + Err(e) => { + debug!("Error reading stream from node {:?}: {}", node, e); + } + } + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; } + Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", + "Unable to read block {:?}: no node returned a valid block", hash ))) } // ---- Public interface ---- + /// Ask nodes that might have a block for it, + /// return it as a stream + pub async fn rpc_get_block_streaming( + &self, + hash: &Hash, + ) -> Result< + Pin> + Send + Sync + 'static>>, + Error, + > { + let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + match header { + DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") + }))), + DataBlockHeader::Compressed => { + // Too many things, I hate it. + let reader = stream_asyncread(stream); + let reader = BufReader::new(reader); + let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); + Ok(Box::pin(tokio_util::io::ReaderStream::new(reader))) + } + } + } + /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + pub async fn rpc_get_block(&self, hash: &Hash) -> Result { self.rpc_get_raw_block(hash).await?.verify_get(*hash) } /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level).await; + + let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + .await + .into_parts(); + let put_block_rpc = + Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); + self.system .rpc .try_call_many( &self.endpoint, &who[..], - // TODO: remove to_vec() here - BlockRpc::PutBlock { hash, data }, + put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; + Ok(()) } @@ -308,13 +403,25 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- + async fn handle_put_block( + &self, + hash: Hash, + header: DataBlockHeader, + stream: Option, + ) -> Result<(), Error> { + let stream = stream.ok_or_message("missing stream")?; + let bytes = read_stream_to_end(stream).await?; + let data = DataBlock::from_parts(header, bytes); + self.write_block(&hash, &data).await + } + /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); let write_size = data.inner_buffer().len() as u64; - let res = self.mutation_lock[hash.as_slice()[0] as usize] + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .with_context(Context::current_with_span( tracer.start("Acquire mutation_lock"), @@ -329,21 +436,31 @@ impl BlockManager { self.metrics.bytes_written.add(write_size); - Ok(res) + Ok(()) } - /// Read block from disk, verifying it's integrity - pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; + async fn handle_get_block(&self, hash: &Hash) -> Resp { + let block = match self.read_block(hash).await { + Ok(data) => data, + Err(e) => return Resp::new(Err(e)), + }; + + let (header, data) = block.into_parts(); - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); + self.metrics.bytes_read.add(data.len() as u64); - Ok(BlockRpc::PutBlock { hash: *hash, data }) + Resp::new(Ok(BlockRpc::PutBlock { + hash: *hash, + header, + })) + .with_stream_from_buffer(data) + } + + /// Read block from disk, verifying it's integrity + pub(crate) async fn read_block(&self, hash: &Hash) -> Result { + self.read_block_internal(hash) + .bound_record_duration(&self.metrics.block_read_duration) + .await } async fn read_block_internal(&self, hash: &Hash) -> Result { @@ -366,9 +483,9 @@ impl BlockManager { drop(f); let data = if compressed { - DataBlock::Compressed(data) + DataBlock::Compressed(data.into()) } else { - DataBlock::Plain(data) + DataBlock::Plain(data.into()) }; if data.verify(*hash).is_err() { @@ -675,7 +792,13 @@ impl BlockManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let put_block_message = self.read_block(hash).await?; + let block = self.read_block(hash).await?; + let (header, bytes) = block.into_parts(); + let put_block_message = Req::new(BlockRpc::PutBlock { + hash: *hash, + header, + })? + .with_stream_from_buffer(bytes); self.system .rpc .try_call_many( @@ -723,17 +846,19 @@ impl BlockManager { } #[async_trait] -impl EndpointHandler for BlockManager { - async fn handle( - self: &Arc, - message: &BlockRpc, - _from: NodeID, - ) -> Result { - match message { - BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, - BlockRpc::GetBlock(h) => self.read_block(h).await, - BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - m => Err(Error::unexpected_rpc_message(m)), +impl StreamingEndpointHandler for BlockManager { + async fn handle(self: &Arc, mut message: Req, _from: NodeID) -> Resp { + match message.msg() { + BlockRpc::PutBlock { hash, header } => Resp::new( + self.handle_put_block(*hash, *header, message.take_stream()) + .await + .map(|_| BlockRpc::Ok), + ), + BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::NeedBlockQuery(h) => { + Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) + } + m => Resp::new(Err(Error::unexpected_rpc_message(m))), } } } @@ -831,7 +956,7 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, - ) -> Result { + ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -842,8 +967,8 @@ impl BlockManagerLocked { fs::create_dir_all(&directory).await?; let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(BlockRpc::Ok), - (Ok(false), false) => return Ok(BlockRpc::Ok), + (Ok(true), _) => return Ok(()), + (Ok(false), false) => return Ok(()), (Ok(false), true) => { let path_to_delete = path.clone(); path.set_extension("zst"); @@ -882,7 +1007,7 @@ impl BlockManagerLocked { dir.sync_all().await?; drop(dir); - Ok(BlockRpc::Ok) + Ok(()) } async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { @@ -963,3 +1088,17 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } + +async fn read_stream_to_end(mut stream: ByteStream) -> Result { + let mut parts: Vec = vec![]; + while let Some(part) = stream.next().await { + parts.push(part.ok_or_message("error in stream")?); + } + + Ok(parts + .iter() + .map(|x| &x[..]) + .collect::>() + .concat() + .into()) +} diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 079cdc70..6e098446 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler}; -pub use netapp::message::{Message as Rpc, *}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +use netapp::message::IntoReq; +pub use netapp::message::{ + Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::{NetApp, NodeID}; +pub use netapp::{self, NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -117,7 +120,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq + Send, - H: EndpointHandler, + H: StreamingEndpointHandler, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -172,7 +175,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; @@ -197,7 +200,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let to = self .0 @@ -211,16 +214,17 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, ) -> Result, Error> where M: Rpc> + 'static, - H: EndpointHandler + 'static, + N: IntoReq, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -261,7 +265,7 @@ impl RpcHelper { where M: Rpc> + 'static, N: IntoReq, - H: EndpointHandler + 'static, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; -- cgit v1.2.3 From 68087ee13dc22dbaeb0c1fa8dcb4bdbaa82098a6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 19:06:56 +0200 Subject: Fix clippy --- src/api/s3/copy.rs | 5 +---- src/api/s3/get.rs | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 54a565e0..b54cbd23 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -366,10 +366,7 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2 - .block_manager - .rpc_put_block(final_hash, data.into()) - .await + garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 7d118f89..c7621ade 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -450,7 +450,6 @@ fn body_from_blocks_range( let garage = garage.clone(); async move { let data = garage.block_manager.rpc_get_block(&block.hash).await?; - let data = Bytes::from(data); let start_in_block = if true_offset > begin { 0 } else { -- cgit v1.2.3 From 33750c04ed680732a2aa6a695dd5f4fea6d9a393 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 19:10:23 +0200 Subject: Update cargo.nix --- Cargo.nix | 176 ++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 103 insertions(+), 73 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index 144967ee..ec828ae9 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -125,6 +125,28 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".async-compression."0.3.10" = overridableMkRustCrate (profileName: rec { + name = "async-compression"; + version = "0.3.10"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "00461f243d703f6999c8e7494f077799f1362720a55ae49a90ffe6214032fc0b"; }; + features = builtins.concatLists [ + [ "default" ] + [ "libzstd" ] + [ "tokio" ] + [ "zstd" ] + [ "zstd-safe" ] + ]; + dependencies = { + futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; + memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; }; + pin_project_lite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; }; + tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; + libzstd = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.2+zstd.1.5.1" { inherit profileName; }; + zstd_safe = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.3+zstd.1.5.1" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".async-stream."0.3.3" = overridableMkRustCrate (profileName: rec { name = "async-stream"; version = "0.3.3"; @@ -242,7 +264,7 @@ in aws_smithy_types = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."0.38.0" { inherit profileName; }; aws_smithy_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-xml."0.38.0" { inherit profileName; }; aws_types = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-types."0.8.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md5."0.7.0" { inherit profileName; }; tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; @@ -288,7 +310,7 @@ in dependencies = { aws_smithy_eventstream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-eventstream."0.38.0" { inherit profileName; }; aws_smithy_http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-http."0.38.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; form_urlencoded = rustPackages."registry+https://github.com/rust-lang/crates.io-index".form_urlencoded."1.0.1" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; @@ -335,7 +357,7 @@ in aws_smithy_http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-http."0.38.0" { inherit profileName; }; aws_smithy_http_tower = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-http-tower."0.38.0" { inherit profileName; }; aws_smithy_types = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."0.38.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; fastrand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fastrand."1.7.0" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; http_body = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."0.4.4" { inherit profileName; }; @@ -357,7 +379,7 @@ in src = fetchCratesIo { inherit name version; sha256 = "f972226c639e0dc1eca2cb0220c1b5799e2bfc62eda37845b662c5d0cb972371"; }; dependencies = { aws_smithy_types = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."0.38.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; crc32fast = rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.3.2" { inherit profileName; }; }; }); @@ -377,7 +399,7 @@ in dependencies = { aws_smithy_eventstream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-eventstream."0.38.0" { inherit profileName; }; aws_smithy_types = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."0.38.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; bytes_utils = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes-utils."0.1.2" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; @@ -398,7 +420,7 @@ in src = fetchCratesIo { inherit name version; sha256 = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8"; }; dependencies = { aws_smithy_http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-http."0.38.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; http_body = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."0.4.4" { inherit profileName; }; pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.10" { inherit profileName; }; @@ -558,11 +580,11 @@ in ]; }); - "registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" = overridableMkRustCrate (profileName: rec { + "registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" = overridableMkRustCrate (profileName: rec { name = "bytes"; - version = "1.1.0"; + version = "1.2.0"; registry = "registry+https://github.com/rust-lang/crates.io-index"; - src = fetchCratesIo { inherit name version; sha256 = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"; }; + src = fetchCratesIo { inherit name version; sha256 = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e"; }; features = builtins.concatLists [ [ "default" ] [ "std" ] @@ -579,7 +601,7 @@ in [ "std" ] ]; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; either = rustPackages."registry+https://github.com/rust-lang/crates.io-index".either."1.6.1" { inherit profileName; }; }; }); @@ -759,7 +781,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -1403,7 +1425,7 @@ in ]; dependencies = { async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; bytesize = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytesize."1.1.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; @@ -1417,7 +1439,7 @@ in garage_web = rustPackages."unknown".garage_web."0.7.0" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.4" { inherit profileName; }; + netapp = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; opentelemetry_otlp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; }; opentelemetry_prometheus = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; }; @@ -1458,7 +1480,7 @@ in dependencies = { ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "base64" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "chrono" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "crypto_common" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }; @@ -1505,8 +1527,9 @@ in src = fetchCrateLocal (workspaceSrc + "/src/block"); dependencies = { arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; + async_compression = rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.3.10" { inherit profileName; }; async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; garage_db = rustPackages."unknown".garage_db."0.8.0" { inherit profileName; }; @@ -1520,6 +1543,7 @@ in serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }; tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; + tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.9" { inherit profileName; }; tracing = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; }; zstd = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.2+zstd.1.5.1" { inherit profileName; }; }; @@ -1599,7 +1623,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_table" else null } = rustPackages."unknown".garage_table."0.7.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.7.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.4" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1619,7 +1643,7 @@ in dependencies = { arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; garage_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".garage_util."0.5.1" { inherit profileName; }; @@ -1654,7 +1678,7 @@ in dependencies = { ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "arc_swap" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "futures" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "futures_util" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.7.0" { inherit profileName; }; @@ -1665,7 +1689,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "k8s_openapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".k8s-openapi."0.13.1" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "kube" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kube."0.62.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.4" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "openssl" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl."0.10.38" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "pnet_datalink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }; @@ -1688,7 +1712,7 @@ in src = fetchCratesIo { inherit name version; sha256 = "5c3557f3757e2acd29eaee86804d4e6c38d2abda81b4b349d8a0d2277044265c"; }; dependencies = { async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; garage_rpc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".garage_rpc."0.5.1" { inherit profileName; }; @@ -1711,7 +1735,7 @@ in src = fetchCrateLocal (workspaceSrc + "/src/table"); dependencies = { async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; garage_db = rustPackages."unknown".garage_db."0.8.0" { inherit profileName; }; @@ -1766,7 +1790,7 @@ in dependencies = { ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "blake2" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.9.2" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "chrono" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "digest" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".digest."0.10.3" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }; @@ -1775,7 +1799,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "http" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hyper" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.4" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1881,7 +1905,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; fnv = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fnv."1.0.7" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; futures_sink = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; @@ -2062,7 +2086,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; fnv = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fnv."1.0.7" { inherit profileName; }; itoa = rustPackages."registry+https://github.com/rust-lang/crates.io-index".itoa."1.0.1" { inherit profileName; }; }; @@ -2074,7 +2098,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; pin_project_lite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; }; }; @@ -2141,7 +2165,7 @@ in (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "tcp") ]; dependencies = { - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_channel" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-channel."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_util" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; @@ -2204,7 +2228,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; hyper = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; }; native_tls = rustPackages."registry+https://github.com/rust-lang/crates.io-index".native-tls."0.2.8" { inherit profileName; }; tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; @@ -2406,7 +2430,7 @@ in ]; dependencies = { base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; }; @@ -2477,7 +2501,7 @@ in ]; dependencies = { base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; }; dirs = rustPackages."registry+https://github.com/rust-lang/crates.io-index".dirs-next."2.0.0" { inherit profileName; }; either = rustPackages."registry+https://github.com/rust-lang/crates.io-index".either."1.6.1" { inherit profileName; }; @@ -2846,7 +2870,7 @@ in [ "default" ] ]; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; encoding_rs = rustPackages."registry+https://github.com/rust-lang/crates.io-index".encoding_rs."0.8.30" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; @@ -2913,37 +2937,43 @@ in }; }); - "registry+https://github.com/rust-lang/crates.io-index".netapp."0.4.4" = overridableMkRustCrate (profileName: rec { + "git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" = overridableMkRustCrate (profileName: rec { name = "netapp"; - version = "0.4.4"; - registry = "registry+https://github.com/rust-lang/crates.io-index"; - src = fetchCratesIo { inherit name version; sha256 = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee"; }; + version = "0.5.0"; + registry = "git+https://git.deuxfleurs.fr/lx/netapp"; + src = fetchCrateGit { + url = https://git.deuxfleurs.fr/lx/netapp; + name = "netapp"; + version = "0.5.0"; + rev = "ab80ade4f0034cbdcf15a99c674730f85eb06870"; + ref = "stream-body";}; features = builtins.concatLists [ - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry-contrib") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "rand") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "telemetry") + [ "default" ] + [ "opentelemetry" ] + [ "opentelemetry-contrib" ] + [ "rand" ] + [ "telemetry" ] ]; dependencies = { - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "arc_swap" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."0.6.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "cfg_if" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.2.4" { profileName = "__noProfile"; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "kuska_handshake" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry_contrib" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-contrib."0.9.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.5.6" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.14.4" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio_stream" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio_util" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.9" { inherit profileName; }; + arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; + async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; + cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; + err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.2.4" { profileName = "__noProfile"; }; + futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; + hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; + kuska_handshake = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; + sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; + log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; + opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; + opentelemetry_contrib = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-contrib."0.9.0" { inherit profileName; }; + pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.10" { inherit profileName; }; + rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.5.6" { inherit profileName; }; + rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.14.4" { inherit profileName; }; + serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; + tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; + tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; + tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.9" { inherit profileName; }; }; }); @@ -3572,7 +3602,7 @@ in [ "std" ] ]; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; prost_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".prost-derive."0.9.0" { profileName = "__noProfile"; }; }; }); @@ -3583,7 +3613,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; heck = rustPackages."registry+https://github.com/rust-lang/crates.io-index".heck."0.3.3" { inherit profileName; }; itertools = rustPackages."registry+https://github.com/rust-lang/crates.io-index".itertools."0.10.3" { inherit profileName; }; lazy_static = rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }; @@ -3620,7 +3650,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; prost = rustPackages."registry+https://github.com/rust-lang/crates.io-index".prost."0.9.0" { inherit profileName; }; }; }); @@ -3990,7 +4020,7 @@ in ]; dependencies = { ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; }; untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; }; @@ -4063,7 +4093,7 @@ in dependencies = { async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; crc32fast = rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.3.2" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; @@ -4109,7 +4139,7 @@ in src = fetchCratesIo { inherit name version; sha256 = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272"; }; dependencies = { base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; }; digest = rustPackages."registry+https://github.com/rust-lang/crates.io-index".digest."0.9.0" { inherit profileName; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; @@ -4885,7 +4915,7 @@ in (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "winapi") ]; dependencies = { - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") && hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "memchr" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "mio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".mio."0.8.2" { inherit profileName; }; @@ -4971,14 +5001,14 @@ in features = builtins.concatLists [ (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "codec") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "compat") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "futures-io") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc") "io") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "io") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc") "slab") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc") "time") ]; dependencies = { - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_io" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-io."0.3.21" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_sink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; @@ -4995,7 +5025,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1"; }; dependencies = { - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; futures_sink = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; @@ -5041,7 +5071,7 @@ in async_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-stream."0.3.3" { inherit profileName; }; async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; h2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".h2."0.3.12" { inherit profileName; }; @@ -5143,7 +5173,7 @@ in ]; dependencies = { base64 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.13.0" { inherit profileName; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; }; + bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; }; http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; @@ -5622,11 +5652,11 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From 126b0373074600695a46ac24933b2330ee076ea9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 11:02:55 +0200 Subject: update netapp --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 9d12f523..0d00bca1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,7 +2158,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#ab80ade4f0034cbdcf15a99c674730f85eb06870" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#fed0542313824df295a7e322a9aebe8ba62f97b9" dependencies = [ "arc-swap", "async-trait", -- cgit v1.2.3 From f0ee3056d3357221d71ad7d346e419dc26b9e063 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 11:13:12 +0200 Subject: Update cargo.nix --- Cargo.nix | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index ec828ae9..a6c42b40 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -781,7 +781,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2832,7 +2832,7 @@ in [ "os-poll" ] ]; dependencies = { - ${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; ${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; }; ${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; }; @@ -2945,7 +2945,7 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "ab80ade4f0034cbdcf15a99c674730f85eb06870"; + rev = "fed0542313824df295a7e322a9aebe8ba62f97b9"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] @@ -4632,7 +4632,7 @@ in ]; dependencies = { bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; }; static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; }; @@ -5652,10 +5652,10 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From e935861854deed5d1ca66767fc51d9849201a4dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 18:19:35 +0200 Subject: Factor out node request order selection logic & use in manager --- Cargo.lock | 2 +- script/dev-cluster.sh | 2 +- src/block/manager.rs | 2 ++ src/rpc/rpc_helper.rs | 95 ++++++++++++++++++++++++++++++--------------------- 4 files changed, 60 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d00bca1..e3d8373f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,7 +2158,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#fed0542313824df295a7e322a9aebe8ba62f97b9" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#74e57016f63b6052cf6d539812859c3a46138eee" dependencies = [ "arc-swap", "async-trait", diff --git a/script/dev-cluster.sh b/script/dev-cluster.sh index fa0a950e..c7fbe08d 100755 --- a/script/dev-cluster.sh +++ b/script/dev-cluster.sh @@ -11,7 +11,7 @@ PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH" FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m") export RUST_BACKTRACE=1 -export RUST_LOG=garage=info,garage_api=debug +export RUST_LOG=garage=info,garage_api=debug,netapp=trace MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m" WHICH_GARAGE=$(which garage || exit 1) diff --git a/src/block/manager.rs b/src/block/manager.rs index bb01c300..80c52510 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -185,6 +185,7 @@ impl BlockManager { hash: &Hash, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); + //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -225,6 +226,7 @@ impl BlockManager { /// Return its entire body async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); + //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6e098446..ddabd636 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -292,47 +292,19 @@ impl RpcHelper { // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; - - // Augment requests with some information used to sort them. - // The tuples are as follows: - // (is another node?, is another zone?, latency, node ID, request future) - // We store all of these tuples in a vec that we can sort. - // By sorting this vec, we priorize ourself, then nodes in the same zone, - // and within a same zone we priorize nodes with the lowest latency. - let mut requests = requests - .map(|(to, fut)| { - let peer_zone = match ring.layout.node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; - let peer_avg_ping = peer_list - .iter() - .find(|x| x.id.as_ref() == to.as_slice()) - .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); - ( - to != self.0.our_node_id, - peer_zone != our_zone, - peer_avg_ping, - to, - fut, - ) - }) + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let mut ord_requests = vec![(); request_order.len()] + .into_iter() + .map(|_| None) .collect::>(); - - // Sort requests by (priorize ourself, priorize same zone, priorize low latency) - requests - .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping)); + for (to, fut) in requests { + let i = request_order.iter().position(|x| *x == to).unwrap(); + ord_requests[i] = Some((to, fut)); + } // Make an iterator to take requests in their sorted order - let mut requests = requests.into_iter(); + let mut requests = ord_requests.into_iter().map(Option::unwrap); // resp_stream will contain all of the requests that are currently in flight. // (for the moment none, they will be added in the loop below) @@ -343,7 +315,7 @@ impl RpcHelper { // If the current set of requests that are running is not enough to possibly // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { - if let Some((_, _, _, req_to, fut)) = requests.next() { + if let Some((req_to, fut)) = requests.next() { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( @@ -413,4 +385,49 @@ impl RpcHelper { Err(Error::Quorum(quorum, successes.len(), to.len(), errors)) } } + + pub fn request_order(&self, nodes: &[Uuid]) -> Vec { + // Retrieve some status variables that we will use to sort requests + let peer_list = self.0.fullmesh.get_peer_list(); + let ring: Arc = self.0.ring.borrow().clone(); + let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + Some(pc) => &pc.zone, + None => "", + }; + + // Augment requests with some information used to sort them. + // The tuples are as follows: + // (is another node?, is another zone?, latency, node ID, request future) + // We store all of these tuples in a vec that we can sort. + // By sorting this vec, we priorize ourself, then nodes in the same zone, + // and within a same zone we priorize nodes with the lowest latency. + let mut nodes = nodes + .iter() + .map(|to| { + let peer_zone = match ring.layout.node_role(&to) { + Some(pc) => &pc.zone, + None => "", + }; + let peer_avg_ping = peer_list + .iter() + .find(|x| x.id.as_ref() == to.as_slice()) + .and_then(|pi| pi.avg_ping) + .unwrap_or_else(|| Duration::from_secs(1)); + ( + *to != self.0.our_node_id, + peer_zone != our_zone, + peer_avg_ping, + *to, + ) + }) + .collect::>(); + + // Sort requests by (priorize ourself, priorize same zone, priorize low latency) + nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); + + nodes + .into_iter() + .map(|(_, _, _, to)| to) + .collect::>() + } } -- cgit v1.2.3 From 5d065b8a0f66d7898746b4b419a1a55cfccc9bbf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 29 Aug 2022 17:24:53 +0200 Subject: cargo2nix fix to fetchCrateGit --- nix/common.nix | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nix/common.nix b/nix/common.nix index 8396a6de..aa59cdc0 100644 --- a/nix/common.nix +++ b/nix/common.nix @@ -8,10 +8,10 @@ rec { sha256 = "1xy9zpypqfxs5gcq5dcla4bfkhxmh5nzn9dyqkr03lqycm9wg5cr"; }; cargo2nixSrc = fetchGit { - # As of 2022-03-17 - url = "https://github.com/superboum/cargo2nix"; - ref = "dedup_propagate"; - rev = "486675c67249e735dd7eb68e1b9feac9db102be7"; + # As of 2022-08-29, stacking two patches: superboum@dedup_propagate and Alexis211@fix_fetchcrategit + url = "https://github.com/Alexis211/cargo2nix"; + ref = "fix_fetchcrategit"; + rev = "4b31c0cc05b6394916d46e9289f51263d81973b9"; }; /* -- cgit v1.2.3 From 322dafc761295df45c081183c5fc059a750a3249 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 29 Aug 2022 17:32:45 +0200 Subject: Try to fix clippy --- src/rpc/rpc_helper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ddabd636..216fffd4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -404,7 +404,7 @@ impl RpcHelper { let mut nodes = nodes .iter() .map(|to| { - let peer_zone = match ring.layout.node_role(&to) { + let peer_zone = match ring.layout.node_role(to) { Some(pc) => &pc.zone, None => "", }; -- cgit v1.2.3 From e598231ca442715af8321ff5520e5b3d59609ed9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 19:27:25 +0200 Subject: update netapp git commit --- Cargo.lock | 2 +- Cargo.nix | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3bebfb8..cfc92d21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,7 +2176,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#74e57016f63b6052cf6d539812859c3a46138eee" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#b55f61c38b01da01314d99ced543aba713dbd2a9" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.nix b/Cargo.nix index 6d35fb9e..7760ee1f 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -2965,7 +2965,7 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "74e57016f63b6052cf6d539812859c3a46138eee"; + rev = "b55f61c38b01da01314d99ced543aba713dbd2a9"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] @@ -4055,7 +4055,7 @@ in ]; dependencies = { ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; }; untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; }; @@ -4677,7 +4677,7 @@ in ]; dependencies = { bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; }; static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; }; @@ -5774,11 +5774,11 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From 70231d68b27054c2185b73b5ceee1c445baaaa2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 19:44:27 +0200 Subject: Fix bytes_read counter --- src/block/manager.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 80c52510..b8fe4c74 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -449,8 +449,6 @@ impl BlockManager { let (header, data) = block.into_parts(); - self.metrics.bytes_read.add(data.len() as u64); - Resp::new(Ok(BlockRpc::PutBlock { hash: *hash, header, @@ -460,9 +458,16 @@ impl BlockManager { /// Read block from disk, verifying it's integrity pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - self.read_block_internal(hash) + let data = self + .read_block_internal(hash) .bound_record_duration(&self.metrics.block_read_duration) - .await + .await?; + + self.metrics + .bytes_read + .add(data.inner_buffer().len() as u64); + + Ok(data) } async fn read_block_internal(&self, hash: &Hash) -> Result { -- cgit v1.2.3 From 4b726b09410b3b5ea9cde80d2a445d7914432e3c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 09:47:28 +0200 Subject: netapp recv with unbounded channel removes deadlock --- Cargo.lock | 2 +- Cargo.nix | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfc92d21..7b5f6984 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,7 +2176,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#b55f61c38b01da01314d99ced543aba713dbd2a9" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#3fd30c6e280fba41377c8b563352d756e8bc1caf" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.nix b/Cargo.nix index 7760ee1f..79f6ca7e 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -791,7 +791,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2965,7 +2965,7 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "b55f61c38b01da01314d99ced543aba713dbd2a9"; + rev = "3fd30c6e280fba41377c8b563352d756e8bc1caf"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] @@ -5776,9 +5776,9 @@ in dependencies = { ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From bc977f9a7a7a5bd87ccf5fe96d64b397591f8ba0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:58:20 +0200 Subject: Update to Netapp with OrderTag support and exploit OrderTags --- Cargo.lock | 2 +- src/api/s3/copy.rs | 10 ++++++++-- src/api/s3/get.rs | 21 ++++++++++++++------ src/block/manager.rs | 55 ++++++++++++++++++++++++++++++++++++--------------- src/rpc/rpc_helper.rs | 2 +- 5 files changed, 64 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b5f6984..bcb8f215 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,7 +2176,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#3fd30c6e280fba41377c8b563352d756e8bc1caf" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e" dependencies = [ "arc-swap", "async-trait", diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index b54cbd23..10cf5935 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -306,11 +307,16 @@ pub async fn handle_upload_part_copy( // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); + let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { + .enumerate() + .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; + let data = garage3 + .block_manager + .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) + .await?; match range_to_copy { Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index c7621ade..dfc284fe 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -10,6 +10,7 @@ use http::header::{ use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; +use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -242,9 +243,11 @@ pub async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { + let order_stream = OrderTag::stream(); + let read_first_block = garage .block_manager - .rpc_get_block_streaming(first_block_hash); + .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0))); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey); let (first_block_stream, version) = @@ -260,7 +263,8 @@ pub async fn handle_get( blocks[0].1 = Some(first_block_stream); let body_stream = futures::stream::iter(blocks) - .map(move |(hash, stream_opt)| { + .enumerate() + .map(move |(i, (hash, stream_opt))| { let garage = garage.clone(); async move { if let Some(stream) = stream_opt { @@ -268,7 +272,7 @@ pub async fn handle_get( } else { garage .block_manager - .rpc_get_block_streaming(&hash) + .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await .unwrap_or_else(|_| { Box::pin(futures::stream::once(async move { @@ -281,7 +285,7 @@ pub async fn handle_get( } } }) - .buffered(3) + .buffered(2) .flatten(); let body = hyper::body::Body::wrap_stream(body_stream); @@ -445,11 +449,16 @@ fn body_from_blocks_range( true_offset += b.size; } + let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) - .map(move |(block, true_offset)| { + .enumerate() + .map(move |(i, (block, true_offset))| { let garage = garage.clone(); async move { - let data = garage.block_manager.rpc_get_block(&block.hash).await?; + let data = garage + .block_manager + .rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) + .await?; let start_in_block = if true_offset > begin { 0 } else { diff --git a/src/block/manager.rs b/src/block/manager.rs index b8fe4c74..b9f6fc0f 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -33,6 +33,7 @@ use garage_util::metrics::RecordDuration; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; @@ -70,7 +71,7 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub enum BlockRpc { Ok, /// Message to ask for a block of data, by hash - GetBlock(Hash), + GetBlock(Hash, Option), /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock { @@ -183,15 +184,18 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -224,15 +228,21 @@ impl BlockManager { /// Ask nodes that might have a (possibly compressed) block for it /// Return its entire body - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { + async fn rpc_get_raw_block( + &self, + hash: &Hash, + order_tag: Option, + ) -> Result { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -275,11 +285,12 @@ impl BlockManager { pub async fn rpc_get_block_streaming( &self, hash: &Hash, + order_tag: Option, ) -> Result< Pin> + Send + Sync + 'static>>, Error, > { - let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") @@ -295,8 +306,14 @@ impl BlockManager { } /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result { - self.rpc_get_raw_block(hash).await?.verify_get(*hash) + pub async fn rpc_get_block( + &self, + hash: &Hash, + order_tag: Option, + ) -> Result { + self.rpc_get_raw_block(hash, order_tag) + .await? + .verify_get(*hash) } /// Send block to nodes that should have it @@ -441,7 +458,7 @@ impl BlockManager { Ok(()) } - async fn handle_get_block(&self, hash: &Hash) -> Resp { + async fn handle_get_block(&self, hash: &Hash, order_tag: Option) -> Resp { let block = match self.read_block(hash).await { Ok(data) => data, Err(e) => return Resp::new(Err(e)), @@ -449,11 +466,17 @@ impl BlockManager { let (header, data) = block.into_parts(); - Resp::new(Ok(BlockRpc::PutBlock { + let resp = Resp::new(Ok(BlockRpc::PutBlock { hash: *hash, header, })) - .with_stream_from_buffer(data) + .with_stream_from_buffer(data); + + if let Some(order_tag) = order_tag { + resp.with_order_tag(order_tag) + } else { + resp + } } /// Read block from disk, verifying it's integrity @@ -841,7 +864,7 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_raw_block(hash).await?; + let block_data = self.rpc_get_raw_block(hash, None).await?; self.metrics.resync_recv_counter.add(1); @@ -861,7 +884,7 @@ impl StreamingEndpointHandler for BlockManager { .await .map(|_| BlockRpc::Ok), ), - BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 216fffd4..6c79c502 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,7 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From f3bf34b6a18c547c5fb29346787648048c093d52 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 14:23:54 +0200 Subject: update netapp: straming + fix-ping --- Cargo.lock | 2 +- Cargo.nix | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bcb8f215..4c31d697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,7 +2176,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#4a59b73d7bfd0f136f654e874afb5d2a9bf4df2e" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#22d96929d5416750e1f5889ee6cc16b382293104" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.nix b/Cargo.nix index 79f6ca7e..b42e1f95 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -791,7 +791,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2852,7 +2852,7 @@ in [ "os-poll" ] ]; dependencies = { - ${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; ${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; }; ${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; }; @@ -2965,13 +2965,12 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "3fd30c6e280fba41377c8b563352d756e8bc1caf"; + rev = "22d96929d5416750e1f5889ee6cc16b382293104"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] [ "opentelemetry" ] [ "opentelemetry-contrib" ] - [ "rand" ] [ "telemetry" ] ]; dependencies = { @@ -4055,7 +4054,7 @@ in ]; dependencies = { ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; }; untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; }; ${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; }; @@ -5774,11 +5773,11 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From df094bd8075332bb765b8b44c9b19cf2485e9ca8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:30:44 +0200 Subject: Less strict timeouts --- Cargo.lock | 2 +- src/block/manager.rs | 8 ++++++-- src/rpc/rpc_helper.rs | 2 +- src/rpc/system.rs | 6 +++--- src/table/gc.rs | 3 ++- src/table/sync.rs | 3 ++- src/table/table.rs | 2 +- 7 files changed, 16 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4c31d697..632c2131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,7 +2176,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#22d96929d5416750e1f5889ee6cc16b382293104" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#f6ad1d0fab340e77fbfcb3488a98c342d334838e" dependencies = [ "arc-swap", "async-trait", diff --git a/src/block/manager.rs b/src/block/manager.rs index b9f6fc0f..00438648 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -48,10 +48,14 @@ use crate::repair::*; pub const INLINE_THRESHOLD: usize = 3072; // Timeout for RPCs that read and write blocks to remote nodes -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60); // Timeout for RPCs that ask other nodes whether they need a copy // of a given block before we delete it locally -const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +// The timeout here is relatively low because we don't want to block +// the entire resync loop when some nodes are not responding. +// Nothing will be deleted if the nodes don't answer the queries, +// we will just retry later. +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15); // The delay between the time where a resync operation fails // and the time when it is retried, with exponential backoff diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6c79c502..e9575261 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -31,7 +31,7 @@ use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); // Don't allow more than 100 concurrent outgoing RPCs. const MAX_CONCURRENT_REQUESTS: usize = 100; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 5858660e..d7ef2140 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -38,7 +38,7 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const PING_TIMEOUT: Duration = Duration::from_secs(2); +const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); /// Version tag used for version check upon Netapp connection pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007 @@ -561,7 +561,7 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), ) .await; @@ -685,7 +685,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { diff --git a/src/table/gc.rs b/src/table/gc.rs index 12218d97..6cae9701 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -25,7 +25,8 @@ use crate::replication::*; use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; -const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager +const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15); // GC delay for table entries: 1 day (24 hours) // (the delay before the entry is added in the GC todo list diff --git a/src/table/sync.rs b/src/table/sync.rs index b3756a5e..62b88a58 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -24,7 +24,8 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// Sync RPC can contain a lot of data, so have a 1min timeout +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60); // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); diff --git a/src/table/table.rs b/src/table/table.rs index 3c211728..51f3837f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -31,7 +31,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); +pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct Table { pub system: Arc, -- cgit v1.2.3 From e648bf7b69a487ca3f43d4ba607bfc2e354a5832 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:31:04 +0200 Subject: update cargo.nix --- Cargo.nix | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index b42e1f95..9f477491 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -791,7 +791,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2965,7 +2965,7 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "22d96929d5416750e1f5889ee6cc16b382293104"; + rev = "f6ad1d0fab340e77fbfcb3488a98c342d334838e"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] @@ -5773,11 +5773,11 @@ in [ "default" ] ]; dependencies = { - ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From 99b532b85bf35b5acf621c229fb991825f3d994c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:35:43 +0200 Subject: Apply PRIO_SECONDARY to block data transfers --- src/block/manager.rs | 6 +++--- src/rpc/rpc_helper.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 00438648..a9def3b0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -198,7 +198,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL, + PRIO_NORMAL | PRIO_SECONDARY, ); tokio::select! { res = rpc => { @@ -245,7 +245,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL, + PRIO_NORMAL | PRIO_SECONDARY, ); tokio::select! { res = rpc => { @@ -336,7 +336,7 @@ impl BlockManager { &self.endpoint, &who[..], put_block_rpc, - RequestStrategy::with_priority(PRIO_NORMAL) + RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index e9575261..aa204c5e 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,7 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From 1ef87ac4cb676113e86fc16a9eb27546d9a737bd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 13:38:29 +0200 Subject: cargo fmt --- src/rpc/rpc_helper.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index aa204c5e..19abb4c5 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,8 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + PRIO_SECONDARY, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From 13b5f28c7e8dec12b1db61735931b3830a3c893f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 13:46:42 +0200 Subject: Make use of BytesBuf from new Netapp --- src/api/s3/put.rs | 43 ++++++++++--------------------------------- 1 file changed, 10 insertions(+), 33 deletions(-) diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index dc0530df..97b8e4e3 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; use futures::prelude::*; @@ -13,6 +13,7 @@ use opentelemetry::{ Context, }; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -108,7 +109,7 @@ pub(crate) async fn save_stream> + Unpin>( size, etag: data_md5sum_hex.clone(), }, - first_block, + first_block.to_vec(), )), }; @@ -136,7 +137,6 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { @@ -318,7 +318,6 @@ async fn read_and_put_blocks> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - let block = Bytes::from(block); let (_, _, block_hash) = futures::future::join3( md5hasher.update(block.clone()), sha256hasher.update(block.clone()), @@ -387,8 +386,7 @@ struct StreamChunker>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque, - buf_len: usize, + buf: BytesBuf, } impl> + Unpin> StreamChunker { @@ -397,45 +395,25 @@ impl> + Unpin> StreamChunker { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), } } - async fn next(&mut self) -> Result>, Error> { - while !self.read_all && self.buf_len < self.block_size { + async fn next(&mut self) -> Result, Error> { + while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf_len += bytes.len(); - self.buf.push_back(bytes); + self.buf.extend(bytes); } else { self.read_all = true; } } - if self.buf_len == 0 { + if self.buf.is_empty() { Ok(None) } else { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while self.buf_len > 0 && taken < self.block_size { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= self.block_size { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = self.block_size - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Ok(Some( - slices.iter().map(|x| &x[..]).collect::>().concat(), - )) + Ok(Some(self.buf.take_max(self.block_size))) } } } @@ -545,7 +523,6 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block = Bytes::from(first_block); let first_block_hash = async_blake2sum(first_block.clone()).await; let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( -- cgit v1.2.3 From c2cc08852bcbd94bad5c15c39e7145c0496d7241 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 19:31:42 +0200 Subject: Reenable node ordering --- src/block/manager.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index a9def3b0..66a454b0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::{Stream, TryStreamExt}; +use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -191,7 +191,7 @@ impl BlockManager { order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -238,7 +238,7 @@ impl BlockManager { order_tag: Option, ) -> Result { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -296,9 +296,7 @@ impl BlockManager { > { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { - DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { - std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") - }))), + DataBlockHeader::Plain => Ok(Box::pin(stream)), DataBlockHeader::Compressed => { // Too many things, I hate it. let reader = stream_asyncread(stream); -- cgit v1.2.3 From 4024822585783368993ac26807d076d8c312bb35 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 19:45:00 +0200 Subject: Update netapp to lastest git version with LAS scheduling --- Cargo.lock | 24 ++++++--------------- Cargo.nix | 73 ++++++++++++++++++++++---------------------------------------- 2 files changed, 32 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 632c2131..6716ea12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2176,13 +2176,13 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#f6ad1d0fab340e77fbfcb3488a98c342d334838e" +source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#0f799a7768997c37e3e1b6861c097c4cd934acde" dependencies = [ "arc-swap", "async-trait", "bytes 1.2.0", "cfg-if 1.0.0", - "err-derive 0.2.4", + "err-derive 0.3.1", "futures", "hex", "kuska-handshake", @@ -2191,12 +2191,12 @@ dependencies = [ "opentelemetry", "opentelemetry-contrib", "pin-project 1.0.10", - "rand 0.5.6", - "rmp-serde 0.14.4", + "rand 0.8.5", + "rmp-serde 0.15.5", "serde", "tokio", "tokio-stream", - "tokio-util 0.6.9", + "tokio-util 0.7.0", ] [[package]] @@ -2752,19 +2752,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" -dependencies = [ - "cloudabi", - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "winapi", -] - [[package]] name = "rand" version = "0.6.5" @@ -3746,6 +3733,7 @@ checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" dependencies = [ "bytes 1.2.0", "futures-core", + "futures-io", "futures-sink", "log", "pin-project-lite", diff --git a/Cargo.nix b/Cargo.nix index 9f477491..9877f729 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -791,7 +791,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -2852,7 +2852,7 @@ in [ "os-poll" ] ]; dependencies = { - ${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; ${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; }; ${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; }; @@ -2965,7 +2965,7 @@ in url = https://git.deuxfleurs.fr/lx/netapp; name = "netapp"; version = "0.5.0"; - rev = "f6ad1d0fab340e77fbfcb3488a98c342d334838e"; + rev = "0f799a7768997c37e3e1b6861c097c4cd934acde"; ref = "stream-body";}; features = builtins.concatLists [ [ "default" ] @@ -2978,7 +2978,7 @@ in async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; - err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.2.4" { profileName = "__noProfile"; }; + err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }; futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; kuska_handshake = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; @@ -2987,12 +2987,12 @@ in opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; opentelemetry_contrib = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-contrib."0.9.0" { inherit profileName; }; pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.10" { inherit profileName; }; - rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.5.6" { inherit profileName; }; - rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.14.4" { inherit profileName; }; + rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; + rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; - tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.6.9" { inherit profileName; }; + tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.0" { inherit profileName; }; }; }); @@ -3718,29 +3718,6 @@ in }; }); - "registry+https://github.com/rust-lang/crates.io-index".rand."0.5.6" = overridableMkRustCrate (profileName: rec { - name = "rand"; - version = "0.5.6"; - registry = "registry+https://github.com/rust-lang/crates.io-index"; - src = fetchCratesIo { inherit name version; sha256 = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9"; }; - features = builtins.concatLists [ - [ "alloc" ] - [ "cloudabi" ] - [ "default" ] - [ "fuchsia-cprng" ] - [ "libc" ] - [ "std" ] - [ "winapi" ] - ]; - dependencies = { - ${ if hostPlatform.parsed.kernel.name == "cloudabi" then "cloudabi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cloudabi."0.0.3" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "fuchsia" then "fuchsia_cprng" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fuchsia-cprng."0.1.1" { inherit profileName; }; - ${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; - rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; }; - ${ if hostPlatform.isWindows then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; }; - }; - }); - "registry+https://github.com/rust-lang/crates.io-index".rand."0.6.5" = overridableMkRustCrate (profileName: rec { name = "rand"; version = "0.6.5"; @@ -3823,10 +3800,6 @@ in version = "0.3.1"; registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"; }; - features = builtins.concatLists [ - [ "alloc" ] - [ "std" ] - ]; dependencies = { rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; }; }; @@ -4676,7 +4649,7 @@ in ]; dependencies = { bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; }; static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; }; @@ -5054,22 +5027,22 @@ in src = fetchCratesIo { inherit name version; sha256 = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"; }; features = builtins.concatLists [ (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "codec") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "compat") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web") "compat") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "futures-io") - (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "io") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web") "futures-io") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web") "io") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc") "slab") (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc") "time") ]; dependencies = { - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_io" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-io."0.3.21" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_sink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "pin_project_lite" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "futures_io" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-io."0.3.21" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures_sink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "pin_project_lite" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "slab" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".slab."0.4.5" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; }; }); @@ -5078,9 +5051,15 @@ in version = "0.7.0"; registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1"; }; + features = builtins.concatLists [ + [ "compat" ] + [ "futures-io" ] + [ "io" ] + ]; dependencies = { bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; futures_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.21" { inherit profileName; }; + futures_io = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-io."0.3.21" { inherit profileName; }; futures_sink = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.21" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; pin_project_lite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; }; @@ -5774,9 +5753,9 @@ in ]; dependencies = { ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); -- cgit v1.2.3 From 907054775dc71a10a92ab96112889db9113130ab Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 22:25:23 +0200 Subject: Faster copy, better get error message --- src/api/s3/copy.rs | 12 +++++------- src/api/s3/get.rs | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 10cf5935..a1a8c9a4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -9,6 +9,7 @@ use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; +use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; @@ -566,7 +567,7 @@ type BlockStreamItem = Result; struct Defragmenter> { block_size: usize, block_stream: Pin>>, - buffer: Vec, + buffer: BytesBuf, hash: Option, } @@ -575,7 +576,7 @@ impl> Defragmenter { Self { block_size, block_stream, - buffer: vec![], + buffer: BytesBuf::new(), hash: None, } } @@ -593,7 +594,7 @@ impl> Defragmenter { if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block.to_vec(); // TODO TOO MUCH COPY + self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; @@ -604,10 +605,7 @@ impl> Defragmenter { } } - Ok(( - Bytes::from(std::mem::take(&mut self.buffer)), - self.hash.take(), - )) + Ok((self.buffer.take_all(), self.hash.take())) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index dfc284fe..dd95f6e7 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -274,11 +274,11 @@ pub async fn handle_get( .block_manager .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .unwrap_or_else(|_| { + .unwrap_or_else(|e| { Box::pin(futures::stream::once(async move { Err(std::io::Error::new( std::io::ErrorKind::Other, - "Could not get next block", + format!("Could not get block {}: {}", i, e), )) })) }) -- cgit v1.2.3 From b823151a0bba7ee6c5f0f96c6b06355572528d94 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 16:57:38 +0200 Subject: improvements in block manager --- src/block/manager.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index b9cd09e7..ec694fc8 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -11,7 +11,7 @@ use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex, MutexGuard}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -261,7 +261,7 @@ impl BlockManager { > { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { - DataBlockHeader::Plain => Ok(Box::pin(stream)), + DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { // Too many things, I hate it. let reader = stream_asyncread(stream); @@ -389,11 +389,7 @@ impl BlockManager { let write_size = data.inner_buffer().len() as u64; - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() - .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), - )) + self.lock_mutate(hash) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) @@ -470,8 +466,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .move_block_to_corrupted(hash, self) .await?; @@ -484,8 +479,7 @@ impl BlockManager { /// Check if this node has a block and whether it needs it pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result { - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .check_block_status(hash, self) .await @@ -499,8 +493,7 @@ impl BlockManager { /// Delete block if it is not needed anymore pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> { - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .delete_if_unneeded(hash, self) .await @@ -532,6 +525,16 @@ impl BlockManager { path.set_extension(""); fs::metadata(&path).await.map(|_| false).map_err(Into::into) } + + async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { + let tracer = opentelemetry::global::tracer("garage"); + self.mutation_lock[hash.as_slice()[0] as usize] + .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) + .await + } } #[async_trait] -- cgit v1.2.3 From 28a4af73ca8c0f82314157939fc98c46f338e84a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 13:11:44 +0200 Subject: Use netapp 0.5 published from crates.io --- Cargo.lock | 3 ++- Cargo.nix | 73 ++++++++++++++++++++++++--------------------------- src/garage/Cargo.toml | 3 +-- src/model/Cargo.toml | 3 +-- src/rpc/Cargo.toml | 3 +-- src/util/Cargo.toml | 3 +-- 6 files changed, 40 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5f04e14..aaac3f1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2033,7 +2033,8 @@ dependencies = [ [[package]] name = "netapp" version = "0.5.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#0f799a7768997c37e3e1b6861c097c4cd934acde" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14fa05cce4c65d4cc5277b2525c5054fca0e51db82416d9f43fb916575233774" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.nix b/Cargo.nix index 77df67d4..161c58cc 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -780,7 +780,7 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; }; dependencies = { - ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; }; }); @@ -1428,7 +1428,7 @@ in garage_web = rustPackages."unknown".garage_web."0.8.0" { inherit profileName; }; hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - netapp = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; + netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.0" { inherit profileName; }; opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; opentelemetry_otlp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; }; opentelemetry_prometheus = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; }; @@ -1599,7 +1599,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_table" else null } = rustPackages."unknown".garage_table."0.8.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.8.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -1637,7 +1637,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "k8s_openapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".k8s-openapi."0.13.1" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "kube" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kube."0.62.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "openssl" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl."0.10.38" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "pnet_datalink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }; @@ -1700,7 +1700,7 @@ in ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "http" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hyper" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "lazy_static" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }; - ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; @@ -2742,7 +2742,7 @@ in [ "os-poll" ] ]; dependencies = { - ${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; ${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; }; ${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; }; @@ -2821,42 +2821,37 @@ in }; }); - "git+https://git.deuxfleurs.fr/lx/netapp".netapp."0.5.0" = overridableMkRustCrate (profileName: rec { + "registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.0" = overridableMkRustCrate (profileName: rec { name = "netapp"; version = "0.5.0"; - registry = "git+https://git.deuxfleurs.fr/lx/netapp"; - src = fetchCrateGit { - url = https://git.deuxfleurs.fr/lx/netapp; - name = "netapp"; - version = "0.5.0"; - rev = "0f799a7768997c37e3e1b6861c097c4cd934acde"; - ref = "stream-body";}; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "14fa05cce4c65d4cc5277b2525c5054fca0e51db82416d9f43fb916575233774"; }; features = builtins.concatLists [ - [ "default" ] - [ "opentelemetry" ] - [ "opentelemetry-contrib" ] - [ "telemetry" ] + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry-contrib") + (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "telemetry") ]; dependencies = { - arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; - async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; - bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; - cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; - err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }; - futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; - hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; - kuska_handshake = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; - sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; - log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; - opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; - opentelemetry_contrib = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-contrib."0.9.0" { inherit profileName; }; - pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.10" { inherit profileName; }; - rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; - rmp_serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; - serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; - tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; - tokio_stream = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; - tokio_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "arc_swap" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "bytes" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.2.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "cfg_if" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "futures" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "kuska_handshake" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry_contrib" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-contrib."0.9.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "pin_project" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.10" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio_stream" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; }; + ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tokio_util" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.0" { inherit profileName; }; }; }); @@ -4501,7 +4496,7 @@ in ]; dependencies = { bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; }; - ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; + ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; }; ${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; }; static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; }; @@ -5604,7 +5599,7 @@ in ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; }; - ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; + ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; }; ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; }; }; }); diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index dcb3b78e..5ce40ff2 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -50,8 +50,7 @@ futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = "0.4" -netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = "0.5" opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry-prometheus = { version = "0.10", optional = true } diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index d6e2adfe..2c2e2bfe 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -39,8 +39,7 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = "0.4" -netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = "0.5" [features] k2v = [ "garage_util/k2v" ] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d7136401..079cfe34 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,8 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.4.5", features = ["telemetry"] } -netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5.0", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index c648e13b..8e978fc2 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -39,8 +39,7 @@ toml = "0.5" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } -#netapp = "0.4" -netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = "0.5" http = "0.2" hyper = "0.14" -- cgit v1.2.3 From ff30891999b5be5421b80b89da1037e943179d2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 15:13:07 +0200 Subject: Use streaming block API for get with Range requests --- src/api/s3/get.rs | 93 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index dd95f6e7..ae4c287d 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -7,10 +7,9 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; -use garage_rpc::rpc_helper::OrderTag; +use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_table::EmptyKey; use garage_util::data::*; @@ -274,14 +273,7 @@ pub async fn handle_get( .block_manager .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64))) .await - .unwrap_or_else(|e| { - Box::pin(futures::stream::once(async move { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("Could not get block {}: {}", i, e), - )) - })) - }) + .unwrap_or_else(|e| error_stream(i, e)) } } }) @@ -437,44 +429,79 @@ fn body_from_blocks_range( all_blocks.len(), 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, )); - let mut true_offset = 0; + let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { - if true_offset >= end { + if block_offset >= end { break; } // Keep only blocks that have an intersection with the requested range - if true_offset < end && true_offset + b.size > begin { - blocks.push((*b, true_offset)); + if block_offset < end && block_offset + b.size > begin { + blocks.push((*b, block_offset)); } - true_offset += b.size; + block_offset += b.size as u64; } let order_stream = OrderTag::stream(); let body_stream = futures::stream::iter(blocks) .enumerate() - .map(move |(i, (block, true_offset))| { + .map(move |(i, (block, block_offset))| { let garage = garage.clone(); async move { - let data = garage + garage .block_manager - .rpc_get_block(&block.hash, Some(order_stream.order(i as u64))) - .await?; - let start_in_block = if true_offset > begin { - 0 - } else { - begin - true_offset - }; - let end_in_block = if true_offset + block.size < end { - block.size - } else { - end - true_offset - }; - Result::::Ok( - data.slice(start_in_block as usize..end_in_block as usize), - ) + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) + } else { + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes + .slice(start_in_chunk as usize..end_in_chunk as usize)))) + }; + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) } }) - .buffered(2); + .buffered(2) + .flatten(); hyper::body::Body::wrap_stream(body_stream) } + +fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { + Box::pin(futures::stream::once(async move { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Could not get block {}: {}", i, e), + )) + })) +} -- cgit v1.2.3