diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-05 18:49:54 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-05 18:49:54 +0100 |
commit | 0bb5b77530ad432e4c77f13b395fe74613812337 (patch) | |
tree | 9734470cff2ba2ce848abc0fdb9b0032c37420b8 | |
parent | 6e69a1fffc715c752a399750c1e26aa46683dbb2 (diff) | |
download | garage-0bb5b77530ad432e4c77f13b395fe74613812337.tar.gz garage-0bb5b77530ad432e4c77f13b395fe74613812337.zip |
[dep-upgrade-202402] wip: port to http/hyper crates v1
-rw-r--r-- | Cargo.lock | 153 | ||||
-rw-r--r-- | Cargo.nix | 179 | ||||
-rw-r--r-- | Cargo.toml | 7 | ||||
-rw-r--r-- | src/api/Cargo.toml | 3 | ||||
-rw-r--r-- | src/api/admin/api_server.rs | 31 | ||||
-rw-r--r-- | src/api/admin/bucket.rs | 35 | ||||
-rw-r--r-- | src/api/admin/cluster.rs | 25 | ||||
-rw-r--r-- | src/api/admin/error.rs | 11 | ||||
-rw-r--r-- | src/api/admin/key.rs | 30 | ||||
-rw-r--r-- | src/api/generic_server.rs | 113 | ||||
-rw-r--r-- | src/api/helpers.rs | 27 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 18 | ||||
-rw-r--r-- | src/api/s3/bucket.rs | 32 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 21 | ||||
-rw-r--r-- | src/api/s3/cors.rs | 45 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 17 | ||||
-rw-r--r-- | src/api/s3/error.rs | 16 | ||||
-rw-r--r-- | src/api/s3/get.rs | 166 | ||||
-rw-r--r-- | src/api/s3/lifecycle.rs | 23 | ||||
-rw-r--r-- | src/api/s3/list.rs | 17 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 38 | ||||
-rw-r--r-- | src/api/s3/post_object.rs | 32 | ||||
-rw-r--r-- | src/api/s3/put.rs | 20 | ||||
-rw-r--r-- | src/api/s3/website.rs | 23 | ||||
-rw-r--r-- | src/api/signature/payload.rs | 4 | ||||
-rw-r--r-- | src/api/signature/streaming.rs | 32 | ||||
-rw-r--r-- | src/web/Cargo.toml | 3 | ||||
-rw-r--r-- | src/web/web_server.rs | 119 |
28 files changed, 764 insertions, 476 deletions
@@ -211,7 +211,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.11", - "hyper", + "hyper 0.14.28", "ring 0.17.7", "time", "tokio", @@ -248,7 +248,7 @@ dependencies = [ "bytes", "fastrand", "http 0.2.11", - "http-body", + "http-body 0.4.6", "percent-encoding", "pin-project-lite", "tracing", @@ -298,7 +298,7 @@ dependencies = [ "aws-types", "bytes", "http 0.2.11", - "http-body", + "http-body 0.4.6", "once_cell", "percent-encoding", "regex-lite", @@ -426,7 +426,7 @@ dependencies = [ "crc32fast", "hex", "http 0.2.11", - "http-body", + "http-body 0.4.6", "md-5", "pin-project-lite", "sha1", @@ -458,7 +458,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.11", - "http-body", + "http-body 0.4.6", "once_cell", "percent-encoding", "pin-project-lite", @@ -497,10 +497,10 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.24", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", @@ -537,7 +537,7 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.11", - "http-body", + "http-body 0.4.6", "itoa", "num-integer", "pin-project-lite", @@ -1298,8 +1298,8 @@ dependencies = [ "git-version", "hex", "hmac", - "http 0.2.11", - "hyper", + "http 1.0.0", + "hyper 1.1.0", "k2v-client", "kuska-sodiumoxide", "mktemp", @@ -1343,11 +1343,12 @@ dependencies = [ "garage_util", "hex", "hmac", - "http 0.2.11", + "http 1.0.0", + "http-body-util", "http-range", "httpdate", - "hyper", - "hyperlocal", + "hyper 1.1.0", + "hyper-util", "idna", "md-5", "multer", @@ -1509,8 +1510,8 @@ dependencies = [ "garage_db", "hex", "hexdump", - "http 0.2.11", - "hyper", + "http 1.0.0", + "hyper 1.1.0", "lazy_static", "mktemp", "netapp", @@ -1537,9 +1538,10 @@ dependencies = [ "garage_model", "garage_table", "garage_util", - "http 0.2.11", - "hyper", - "hyperlocal", + "http 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "opentelemetry", "percent-encoding", "tokio", @@ -1634,6 +1636,25 @@ dependencies = [ ] [[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.2.2", + "slab", + "tokio", + "tokio-util 0.7.10", + "tracing", +] + +[[package]] name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1785,6 +1806,29 @@ dependencies = [ ] [[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "pin-project-lite", +] + +[[package]] name = "http-range" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1818,9 +1862,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.11", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1833,13 +1877,33 @@ dependencies = [ ] [[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "want", +] + +[[package]] name = "hyper-rustls" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http 0.2.11", - "hyper", + "hyper 0.14.28", "log", "rustls 0.20.9", "rustls-native-certs", @@ -1855,7 +1919,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.11", - "hyper", + "hyper 0.14.28", "log", "rustls 0.21.10", "rustls-native-certs", @@ -1869,23 +1933,30 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] [[package]] -name = "hyperlocal" -version = "0.8.0" +name = "hyper-util" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ + "bytes", + "futures-channel", "futures-util", - "hex", - "hyper", - "pin-project", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -2051,8 +2122,8 @@ dependencies = [ "clap 4.4.18", "format_table", "hex", - "http 0.2.11", - "hyper", + "http 1.0.0", + "hyper 1.1.0", "hyper-rustls 0.24.2", "log", "percent-encoding", @@ -2108,8 +2179,8 @@ dependencies = [ "either", "futures", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls 0.23.2", "hyper-timeout", "jsonpath_lib", @@ -3104,10 +3175,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls 0.24.2", "ipnet", "js-sys", @@ -4014,10 +4085,10 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "h2", + "h2 0.3.24", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-timeout", "percent-encoding", "pin-project", @@ -4077,7 +4148,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.11", - "http-body", + "http-body 0.4.6", "http-range-header", "pin-project-lite", "tower-layer", @@ -33,7 +33,7 @@ args@{ ignoreLockHash, }: let - nixifiedLockHash = "8cb28c3a1c83ae5f6a9285b72c92830344521b18d10973434e2706b4033b8907"; + nixifiedLockHash = "b09e8e1592cb6ec8175708b13ee4a2578aa697c18a94d5a545328078ab263b2f"; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); lockHashIgnored = if ignoreLockHash @@ -1916,8 +1916,8 @@ in base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out; chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.33" { inherit profileName; }).out; hmac = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hmac."0.12.1" { inherit profileName; }).out; - http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; k2v_client = (rustPackages."unknown".k2v-client."0.0.4" { inherit profileName; }).out; mktemp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.5.1" { inherit profileName; }).out; serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.113" { inherit profileName; }).out; @@ -1954,11 +1954,12 @@ in garage_util = (rustPackages."unknown".garage_util."0.9.1" { inherit profileName; }).out; hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; hmac = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hmac."0.12.1" { inherit profileName; }).out; - http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out; http_range = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-range."0.1.5" { inherit profileName; }).out; httpdate = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".httpdate."1.0.3" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; - hyperlocal = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyperlocal."0.8.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; + hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out; idna = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.5.0" { inherit profileName; }).out; md5 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.10.6" { inherit profileName; }).out; multer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".multer."3.0.0" { inherit profileName; }).out; @@ -2172,8 +2173,8 @@ in garage_db = (rustPackages."unknown".garage_db."0.9.1" { inherit profileName; }).out; hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out; - http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; lazy_static = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; }).out; netapp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.10.0" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; @@ -2207,9 +2208,10 @@ in garage_model = (rustPackages."unknown".garage_model."0.9.1" { inherit profileName; }).out; garage_table = (rustPackages."unknown".garage_table."0.9.1" { inherit profileName; }).out; garage_util = (rustPackages."unknown".garage_util."0.9.1" { inherit profileName; }).out; - http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; - hyperlocal = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyperlocal."0.8.0" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; + hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out; opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out; percent_encoding = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.3.1" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; @@ -2324,6 +2326,26 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".h2."0.4.2" = overridableMkRustCrate (profileName: rec { + name = "h2"; + version = "0.4.2"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943"; }; + dependencies = { + bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; + fnv = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".fnv."1.0.7" { inherit profileName; }).out; + futures_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.30" { inherit profileName; }).out; + futures_sink = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-sink."0.3.30" { inherit profileName; }).out; + futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + indexmap = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".indexmap."2.2.2" { inherit profileName; }).out; + slab = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".slab."0.4.9" { inherit profileName; }).out; + tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; + tokio_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.10" { inherit profileName; }).out; + tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".hashbrown."0.12.3" = overridableMkRustCrate (profileName: rec { name = "hashbrown"; version = "0.12.3"; @@ -2531,6 +2553,31 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".http-body."1.0.0" = overridableMkRustCrate (profileName: rec { + name = "http-body"; + version = "1.0.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"; }; + dependencies = { + bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" = overridableMkRustCrate (profileName: rec { + name = "http-body-util"; + version = "0.1.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840"; }; + dependencies = { + bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; + futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + http_body = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."1.0.0" { inherit profileName; }).out; + pin_project_lite = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".http-range."0.1.5" = overridableMkRustCrate (profileName: rec { name = "http-range"; version = "0.1.5"; @@ -2569,10 +2616,8 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80"; }; features = builtins.concatLists [ - [ "backports" ] [ "client" ] - [ "default" ] - [ "deprecated" ] + (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "default") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "full") [ "h2" ] [ "http1" ] @@ -2603,6 +2648,34 @@ in }; }); + "registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" = overridableMkRustCrate (profileName: rec { + name = "hyper"; + version = "1.1.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75"; }; + features = builtins.concatLists [ + [ "client" ] + [ "default" ] + [ "http1" ] + [ "http2" ] + [ "server" ] + ]; + dependencies = { + bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; + futures_channel = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-channel."0.3.30" { inherit profileName; }).out; + futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; + h2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".h2."0.4.2" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + http_body = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."1.0.0" { inherit profileName; }).out; + httparse = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".httparse."1.8.0" { inherit profileName; }).out; + httpdate = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".httpdate."1.0.3" { inherit profileName; }).out; + itoa = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".itoa."1.0.10" { inherit profileName; }).out; + pin_project_lite = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; + tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; + want = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".want."0.3.1" { inherit profileName; }).out; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.23.2" = overridableMkRustCrate (profileName: rec { name = "hyper-rustls"; version = "0.23.2"; @@ -2671,20 +2744,36 @@ in }; }); - "registry+https://github.com/rust-lang/crates.io-index".hyperlocal."0.8.0" = overridableMkRustCrate (profileName: rec { - name = "hyperlocal"; - version = "0.8.0"; + "registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" = overridableMkRustCrate (profileName: rec { + name = "hyper-util"; + version = "0.1.3"; registry = "registry+https://github.com/rust-lang/crates.io-index"; - src = fetchCratesIo { inherit name version; sha256 = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c"; }; + src = fetchCratesIo { inherit name version; sha256 = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"; }; features = builtins.concatLists [ + [ "client" ] + [ "client-legacy" ] + [ "default" ] + [ "full" ] + [ "http1" ] + [ "http2" ] [ "server" ] + [ "server-auto" ] + [ "service" ] + [ "tokio" ] ]; dependencies = { + bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; + futures_channel = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-channel."0.3.30" { inherit profileName; }).out; futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; - hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; - pin_project = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.1.4" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + http_body = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."1.0.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; + pin_project_lite = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; + socket2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".socket2."0.5.5" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; + tower = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower."0.4.13" { inherit profileName; }).out; + tower_service = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-service."0.3.2" { inherit profileName; }).out; + tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out; }; }); @@ -2917,8 +3006,8 @@ in ${ if rootFeatures' ? "k2v-client/clap" || rootFeatures' ? "k2v-client/cli" then "clap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".clap."4.4.18" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/format_table" then "format_table" else null } = (rustPackages."unknown".format_table."0.1.1" { inherit profileName; }).out; hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; - http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out; - hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.28" { inherit profileName; }).out; + http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; + hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.24.2" { inherit profileName; }).out; log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out; percent_encoding = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.3.1" { inherit profileName; }).out; @@ -5523,7 +5612,7 @@ in [ "default" ] [ "fs" ] (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "full") - (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "k2v-client/cli") "io-std") + [ "io-std" ] [ "io-util" ] [ "libc" ] [ "macros" ] @@ -5795,43 +5884,43 @@ in registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"; }; features = builtins.concatLists [ - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "__common") + [ "__common" ] (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "balance") (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "buffer") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "default") + [ "default" ] (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "discover") (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "filter") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "futures-core") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "futures-util") + [ "futures-core" ] + [ "futures-util" ] (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "indexmap") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "limit") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "load") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "log") - (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "make") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "pin-project") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "pin-project-lite") + [ "log" ] + [ "make" ] + [ "pin-project" ] + [ "pin-project-lite" ] (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "rand") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "ready-cache") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "slab") (lib.optional (rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp") "timeout") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "tokio") + [ "tokio" ] (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "tokio-util") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "tracing") - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "util") + [ "tracing" ] + [ "util" ] ]; dependencies = { - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "futures_core" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.30" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "futures_util" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; + futures_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-core."0.3.30" { inherit profileName; }).out; + futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" then "indexmap" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".indexmap."1.9.3" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "pin_project" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.1.4" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "pin_project_lite" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; + pin_project = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.1.4" { inherit profileName; }).out; + pin_project_lite = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" then "rand" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" then "slab" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".slab."0.4.9" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "tokio" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; + tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "tokio_util" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.10" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "tower_layer" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-layer."0.3.2" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "tower_service" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-service."0.3.2" { inherit profileName; }).out; - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "tracing" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out; + tower_layer = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-layer."0.3.2" { inherit profileName; }).out; + tower_service = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-service."0.3.2" { inherit profileName; }).out; + tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out; }; }); @@ -5886,12 +5975,12 @@ in features = builtins.concatLists [ [ "attributes" ] [ "default" ] - (lib.optional (rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery") "log") + [ "log" ] [ "std" ] [ "tracing-attributes" ] ]; dependencies = { - ${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage/opentelemetry-otlp" || rootFeatures' ? "garage/telemetry-otlp" || rootFeatures' ? "garage_rpc/kube" || rootFeatures' ? "garage_rpc/kubernetes-discovery" then "log" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out; + log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out; pin_project_lite = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.13" { inherit profileName; }).out; tracing_attributes = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-attributes."0.1.27" { profileName = "__noProfile"; }).out; tracing_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-core."0.1.32" { inherit profileName; }).out; @@ -93,11 +93,12 @@ schemars = "0.8" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-manual-roots", "json"] } form_urlencoded = "1.0.0" -http = "0.2" +http = "1.0" httpdate = "1.0" http-range = "0.1" -hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream", "backports", "deprecated"] } -hyperlocal = { version = "0.8.0", default-features = false, features = ["server"] } +http-body-util = "0.1" +hyper = { version = "1.0", features = ["server", "http1"] } +hyper-util = { verion = "0.1", features = [ "full" ]} multer = "3.0" percent-encoding = "2.2" roxmltree = "0.19" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 7d6ad4af..011a64d8 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -44,8 +44,9 @@ form_urlencoded.workspace = true http.workspace = true httpdate.workspace = true http-range.workspace = true +http-body-util.workspace = true hyper.workspace = true -hyperlocal.workspace = true +hyper-util.workspace = true multer.workspace = true percent-encoding.workspace = true roxmltree.workspace = true diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 0ce3ca0d..d5e1c777 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use futures::future::Future; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use opentelemetry::trace::SpanRef; @@ -27,7 +27,9 @@ use crate::admin::error::*; use crate::admin::key::*; use crate::admin::router_v0; use crate::admin::router_v1::{Authorization, Endpoint}; -use crate::helpers::host_to_bucket; +use crate::helpers::*; + +pub type ResBody = BoxBody<Error>; pub struct AdminApiServer { garage: Arc<Garage>, @@ -71,16 +73,19 @@ impl AdminApiServer { .await } - fn handle_options(&self, _req: &Request<Body>) -> Result<Response<Body>, Error> { + fn handle_options(&self, _req: &Request<IncomingBody>) -> Result<Response<ResBody>, Error> { Ok(Response::builder() .status(StatusCode::NO_CONTENT) .header(ALLOW, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, GET, POST") .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") - .body(Body::empty())?) + .body(empty_body())?) } - async fn handle_check_domain(&self, req: Request<Body>) -> Result<Response<Body>, Error> { + async fn handle_check_domain( + &self, + req: Request<IncomingBody>, + ) -> Result<Response<ResBody>, Error> { let query_params: HashMap<String, String> = req .uri() .query() @@ -104,7 +109,7 @@ impl AdminApiServer { if self.check_domain(domain).await? { Ok(Response::builder() .status(StatusCode::OK) - .body(Body::from(format!( + .body(string_body(format!( "Domain '{domain}' is managed by Garage" )))?) } else { @@ -167,7 +172,7 @@ impl AdminApiServer { } } - fn handle_health(&self) -> Result<Response<Body>, Error> { + fn handle_health(&self) -> Result<Response<ResBody>, Error> { let health = self.garage.system.health(); let (status, status_str) = match health.status { @@ -189,10 +194,10 @@ impl AdminApiServer { Ok(Response::builder() .status(status) .header(http::header::CONTENT_TYPE, "text/plain") - .body(Body::from(status_str))?) + .body(string_body(status_str))?) } - fn handle_metrics(&self) -> Result<Response<Body>, Error> { + fn handle_metrics(&self) -> Result<Response<ResBody>, Error> { #[cfg(feature = "metrics")] { use opentelemetry::trace::Tracer; @@ -212,7 +217,7 @@ impl AdminApiServer { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer))?) + .body(bytes_body(buffer.into()))?) } #[cfg(not(feature = "metrics"))] Err(Error::bad_request( @@ -229,7 +234,7 @@ impl ApiHandler for AdminApiServer { type Endpoint = Endpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<Endpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> { if req.uri().path().starts_with("/v0/") { let endpoint_v0 = router_v0::Endpoint::from_request(req)?; Endpoint::from_v0(endpoint_v0) @@ -240,9 +245,9 @@ impl ApiHandler for AdminApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: Endpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let expected_auth_header = match endpoint.authorization_type() { Authorization::None => None, diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 17f46c30..0bfb87c5 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -17,12 +17,13 @@ use garage_model::permission::*; use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::admin::key::ApiBucketKeyPerm; use crate::common_error::CommonError; -use crate::helpers::{json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_list_buckets(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let buckets = garage .bucket_table .get_range( @@ -90,7 +91,7 @@ pub async fn handle_get_bucket_info( garage: &Arc<Garage>, id: Option<String>, global_alias: Option<String>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = match (id, global_alias) { (Some(id), None) => parse_bucket_id(&id)?, (None, Some(ga)) => garage @@ -111,7 +112,7 @@ pub async fn handle_get_bucket_info( async fn bucket_info_results( garage: &Arc<Garage>, bucket_id: Uuid, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket = garage .bucket_helper() .get_existing_bucket(bucket_id) @@ -268,8 +269,8 @@ struct GetBucketInfoKey { pub async fn handle_create_bucket( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<CreateBucketRequest>(req).await?; if let Some(ga) = &req.global_alias { @@ -360,7 +361,7 @@ struct CreateBucketLocalAlias { pub async fn handle_delete_bucket( garage: &Arc<Garage>, id: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let helper = garage.bucket_helper(); let bucket_id = parse_bucket_id(&id)?; @@ -403,14 +404,14 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_update_bucket( garage: &Arc<Garage>, id: String, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<UpdateBucketRequest>(req).await?; let bucket_id = parse_bucket_id(&id)?; @@ -470,9 +471,9 @@ struct UpdateBucketWebsiteAccess { pub async fn handle_bucket_change_key_perm( garage: &Arc<Garage>, - req: Request<Body>, + req: Request<IncomingBody>, new_perm_flag: bool, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; let bucket_id = parse_bucket_id(&req.bucket_id)?; @@ -526,7 +527,7 @@ pub async fn handle_global_alias_bucket( garage: &Arc<Garage>, bucket_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -541,7 +542,7 @@ pub async fn handle_global_unalias_bucket( garage: &Arc<Garage>, bucket_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -557,7 +558,7 @@ pub async fn handle_local_alias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage @@ -573,7 +574,7 @@ pub async fn handle_local_unalias_bucket( bucket_id: String, access_key_id: String, alias: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let bucket_id = parse_bucket_id(&bucket_id)?; garage diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index c8107b82..1ec8d6de 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; use std::sync::Arc; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use serde::{Deserialize, Serialize}; use garage_util::crdt::*; @@ -11,10 +11,11 @@ use garage_rpc::layout; use garage_model::garage::Garage; +use crate::admin::api_server::ResBody; use crate::admin::error::*; use crate::helpers::{json_ok_response, parse_json_body}; -pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = GetClusterStatusResponse { node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), @@ -39,7 +40,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response< Ok(json_ok_response(&res)?) } -pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = ClusterHealth { @@ -61,8 +62,8 @@ pub async fn handle_get_cluster_health(garage: &Arc<Garage>) -> Result<Response< pub async fn handle_connect_cluster_nodes( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<Vec<String>>(req).await?; let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) @@ -83,7 +84,7 @@ pub async fn handle_connect_cluster_nodes( Ok(json_ok_response(&res)?) } -pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = format_cluster_layout(&garage.system.get_cluster_layout()); Ok(json_ok_response(&res)?) @@ -203,8 +204,8 @@ struct KnownNodeResp { pub async fn handle_update_cluster_layout( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; let mut layout = garage.system.get_cluster_layout(); @@ -243,8 +244,8 @@ pub async fn handle_update_cluster_layout( pub async fn handle_apply_cluster_layout( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; let layout = garage.system.get_cluster_layout(); @@ -261,8 +262,8 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; let layout = garage.system.get_cluster_layout(); diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index ed1a07bd..98cc7a9e 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -1,13 +1,13 @@ use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; +use hyper::{HeaderMap, StatusCode}; pub use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; -use crate::helpers::CustomApiErrorBody; +use crate::helpers::{BytesBody, CustomApiErrorBody}; /// Errors of this crate #[derive(Debug, Error)] @@ -77,14 +77,14 @@ impl ApiError for Error { header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = CustomApiErrorBody { code: self.code().to_string(), message: format!("{}", self), path: path.to_string(), region: garage_region.to_string(), }; - Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { + let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| { r#" { "code": "InternalError", @@ -92,6 +92,7 @@ impl ApiError for Error { } "# .into() - })) + }); + BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 8d1c6890..3e5d2cab 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_table::*; @@ -9,10 +9,11 @@ use garage_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use crate::admin::api_server::ResBody; use crate::admin::error::*; -use crate::helpers::{is_default, json_ok_response, parse_json_body}; +use crate::helpers::*; -pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { +pub async fn handle_list_keys(garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { let res = garage .key_table .get_range( @@ -45,7 +46,7 @@ pub async fn handle_get_key_info( id: Option<String>, search: Option<String>, show_secret_key: bool, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let key = if let Some(id) = id { garage.key_helper().get_existing_key(&id).await? } else if let Some(search) = search { @@ -62,8 +63,8 @@ pub async fn handle_get_key_info( pub async fn handle_create_key( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<CreateKeyRequest>(req).await?; let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); @@ -80,8 +81,8 @@ struct CreateKeyRequest { pub async fn handle_import_key( garage: &Arc<Garage>, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<ImportKeyRequest>(req).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; @@ -111,8 +112,8 @@ struct ImportKeyRequest { pub async fn handle_update_key( garage: &Arc<Garage>, id: String, - req: Request<Body>, -) -> Result<Response<Body>, Error> { + req: Request<IncomingBody>, +) -> Result<Response<ResBody>, Error> { let req = parse_json_body::<UpdateKeyRequest>(req).await?; let mut key = garage.key_helper().get_existing_key(&id).await?; @@ -146,7 +147,10 @@ struct UpdateKeyRequest { deny: Option<KeyPerm>, } -pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Response<Body>, Error> { +pub async fn handle_delete_key( + garage: &Arc<Garage>, + id: String, +) -> Result<Response<ResBody>, Error> { let mut key = garage.key_helper().get_existing_key(&id).await?; key.state.as_option().unwrap(); @@ -155,14 +159,14 @@ pub async fn handle_delete_key(garage: &Arc<Garage>, id: String) -> Result<Respo Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } async fn key_info_results( garage: &Arc<Garage>, key: Key, show_secret: bool, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let mut relevant_buckets = HashMap::new(); let key_state = key.state.as_option().unwrap(); diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index fa346f48..832f2da3 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -6,15 +6,16 @@ use async_trait::async_trait; use futures::future::Future; +use http_body_util::BodyExt; use hyper::header::HeaderValue; -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use hyper::{HeaderMap, StatusCode}; +use hyper_util::rt::TokioIo; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ global, @@ -28,6 +29,8 @@ use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::socket_address::UnixOrTCPSocketAddress; +use crate::helpers::{BoxBody, BytesBody}; + pub(crate) trait ApiEndpoint: Send + Sync + 'static { fn name(&self) -> &'static str; fn add_span_attributes(&self, span: SpanRef<'_>); @@ -36,7 +39,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_status_code(&self) -> StatusCode; fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); - fn http_body(&self, garage_region: &str, path: &str) -> Body; + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody; } #[async_trait] @@ -47,12 +50,12 @@ pub(crate) trait ApiHandler: Send + Sync + 'static { type Endpoint: ApiEndpoint; type Error: ApiError; - fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Self::Error>; + fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>; async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: Self::Endpoint, - ) -> Result<Response<Body>, Self::Error>; + ) -> Result<Response<BoxBody<Self::Error>>, Self::Error>; } pub(crate) struct ApiServer<A: ApiHandler> { @@ -101,72 +104,79 @@ impl<A: ApiHandler> ApiServer<A> { unix_bind_addr_mode: Option<u32>, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { - let tcp_service = make_service_fn(|conn: &AddrStream| { - let this = self.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let this = this.clone(); - - this.handler(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let this = self.clone(); - - let path = bind_addr.to_string(); - async move { - Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { - let this = this.clone(); - - this.handler(req, path.clone()) - })) - } - }); - info!( "{} API server listening on {}", A::API_NAME_DISPLAY, bind_addr ); + tokio::pin!(shutdown_signal); + match bind_addr { UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + let listener = TcpListener::bind(addr).await?; + + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + self.launch_handler(stream, client_addr.to_string()); + } } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; fs::set_permissions( path, Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), )?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + loop { + let (stream, _) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + self.launch_handler(stream, path.display().to_string()); + } } }; Ok(()) } + fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String) + where + S: AsyncRead + AsyncWrite + Send + Sync + 'static, + { + let this = self.clone(); + let io = TokioIo::new(stream); + + let serve = + move |req: Request<IncomingBody>| this.clone().handler(req, client_addr.to_string()); + + tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + } + async fn handler( self: Arc<Self>, - req: Request<Body>, + req: Request<IncomingBody>, addr: String, - ) -> Result<Response<Body>, GarageError> { + ) -> Result<Response<BoxBody<A::Error>>, GarageError> { let uri = req.uri().clone(); if let Ok(forwarded_for_ip_addr) = @@ -205,7 +215,7 @@ impl<A: ApiHandler> ApiServer<A> { Ok(x) } Err(e) => { - let body: Body = e.http_body(&self.region, uri.path()); + let body = e.http_body(&self.region, uri.path()); let mut http_error_builder = Response::builder().status(e.http_status_code()); if let Some(header_map) = http_error_builder.headers_mut() { @@ -219,12 +229,15 @@ impl<A: ApiHandler> ApiServer<A> { } else { info!("Response: error {}, {}", e.http_status_code(), e); } - Ok(http_error) + Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!())))) } } } - async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, A::Error> { + async fn handler_stage2( + &self, + req: Request<IncomingBody>, + ) -> Result<Response<BoxBody<A::Error>>, A::Error> { let endpoint = self.api_handler.parse_endpoint(&req)?; debug!("Endpoint: {}", endpoint.name()); diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 8efaa231..541b2def 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,5 @@ -use hyper::{body::HttpBody, Body, Request, Response}; +use http_body_util::{BodyExt, Full as FullBody}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; @@ -138,18 +139,36 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> { None } -pub async fn parse_json_body<T: for<'de> Deserialize<'de>>(req: Request<Body>) -> Result<T, Error> { +// =============== body helpers ================= + +pub type BytesBody = FullBody<bytes::Bytes>; +pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>; + +pub fn string_body<E>(s: String) -> BoxBody<E> { + bytes_body(bytes::Bytes::from(s.into_bytes())) +} +pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> { + BoxBody::new(FullBody::new(b).map_err(|_| unreachable!())) +} +pub fn empty_body<E>() -> BoxBody<E> { + BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) +} + +pub async fn parse_json_body<T>(req: Request<IncomingBody>) -> Result<T, Error> +where + T: for<'de> Deserialize<'de>, +{ let body = req.into_body().collect().await?.to_bytes(); let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; Ok(resp) } -pub fn json_ok_response<T: Serialize>(res: &T) -> Result<Response<Body>, Error> { +pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, Error> { let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; Ok(Response::builder() .status(hyper::StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/json") - .body(Body::from(resp_json))?) + .body(string_body(resp_json))?) } pub fn is_default<T: Default + PartialEq>(v: &T) -> bool { diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 887839dd..7717fd49 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use futures::future::Future; use hyper::header; -use hyper::{Body, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; @@ -34,6 +34,9 @@ use crate::s3::put::*; use crate::s3::router::Endpoint; use crate::s3::website::*; +pub use crate::signature::streaming::ReqBody; +pub type ResBody = BoxBody<Error>; + pub struct S3ApiServer { garage: Arc<Garage>, } @@ -57,10 +60,10 @@ impl S3ApiServer { async fn handle_request_without_bucket( &self, - _req: Request<Body>, + _req: Request<ReqBody>, api_key: Key, endpoint: Endpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { match endpoint { Endpoint::ListBuckets => handle_list_buckets(&self.garage, &api_key).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), @@ -76,7 +79,7 @@ impl ApiHandler for S3ApiServer { type Endpoint = S3ApiEndpoint; type Error = Error; - fn parse_endpoint(&self, req: &Request<Body>) -> Result<S3ApiEndpoint, Error> { + fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<S3ApiEndpoint, Error> { let authority = req .headers() .get(header::HOST) @@ -104,9 +107,9 @@ impl ApiHandler for S3ApiServer { async fn handle( &self, - req: Request<Body>, + req: Request<IncomingBody>, endpoint: S3ApiEndpoint, - ) -> Result<Response<Body>, Error> { + ) -> Result<Response<ResBody>, Error> { let S3ApiEndpoint { bucket_name, endpoint, @@ -235,8 +238,7 @@ impl ApiHandler for S3ApiServer { } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { - let empty_body: Body = Body::from(vec![]); - let response = Response::builder().body(empty_body).unwrap(); + let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } Endpoint::DeleteBucket {} => { diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index a2437524..fa2f1b6d 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; @@ -14,11 +15,13 @@ use garage_util::data::*; use garage_util::time::*; use crate::common_error::CommonError; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<ResBody>, Error> { let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -27,10 +30,10 @@ pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { +pub fn handle_get_bucket_versioning() -> Result<Response<ResBody>, Error> { let versioning = s3_xml::VersioningConfiguration { xmlns: (), status: None, @@ -40,10 +43,13 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } -pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> { +pub async fn handle_list_buckets( + garage: &Garage, + api_key: &Key, +) -> Result<Response<ResBody>, Error> { let key_p = api_key.params().ok_or_internal_error( "Key should not be in deleted state at this point (in handle_list_buckets)", )?; @@ -109,17 +115,17 @@ pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Respo Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_create_bucket( garage: &Garage, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, api_key: Key, bucket_name: String, -) -> Result<Response<Body>, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -194,7 +200,7 @@ pub async fn handle_create_bucket( Ok(Response::builder() .header("Location", format!("/{}", bucket_name)) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } @@ -203,7 +209,7 @@ pub async fn handle_delete_bucket( bucket_id: Uuid, bucket_name: String, api_key: Key, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let key_params = api_key .params() .ok_or_internal_error("Key should not be deleted at this point")?; @@ -282,7 +288,7 @@ pub async fn handle_delete_bucket( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 68b4f0c9..ba9bfc88 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -6,7 +6,7 @@ use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; -use hyper::{Body, Request, Response}; +use hyper::{Request, Response}; use serde::Serialize; use garage_rpc::netapp::bytes_buf::BytesBuf; @@ -22,7 +22,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use crate::helpers::parse_bucket_key; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::multipart; use crate::s3::put::get_headers; @@ -31,10 +32,10 @@ use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let source_object = get_copy_source(&garage, api_key, req).await?; @@ -176,18 +177,18 @@ pub async fn handle_copy( "x-amz-copy-source-version-id", hex::encode(source_version.uuid), ) - .body(Body::from(xml))?) + .body(string_body(xml))?) } pub async fn handle_upload_part_copy( garage: Arc<Garage>, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let dest_upload_id = multipart::decode_upload_id(upload_id)?; @@ -432,13 +433,13 @@ pub async fn handle_upload_part_copy( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), ) - .body(Body::from(resp_xml))?) + .body(string_body(resp_xml))?) } async fn get_copy_source( garage: &Garage, api_key: &Key, - req: &Request<Body>, + req: &Request<ReqBody>, ) -> Result<Object, Error> { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; @@ -501,7 +502,7 @@ struct CopyPreconditionHeaders { } impl CopyPreconditionHeaders { - fn parse(req: &Request<Body>) -> Result<Self, Error> { + fn parse(req: &Request<ReqBody>) -> Result<Self, Error> { Ok(Self { copy_source_if_match: req .headers() diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index d2bcf125..4b8754a9 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -5,10 +5,17 @@ use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ACCESS_CONTROL_EXPOSE_HEADERS, ACCESS_CONTROL_REQUEST_HEADERS, ACCESS_CONTROL_REQUEST_METHOD, }; -use hyper::{body::HttpBody, header::HeaderName, Body, Method, Request, Response, StatusCode}; +use hyper::{ + body::Body, body::Incoming as IncomingBody, header::HeaderName, Method, Request, Response, + StatusCode, +}; + +use http_body_util::BodyExt; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -17,7 +24,7 @@ use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -34,18 +41,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_cors( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -55,16 +62,16 @@ pub async fn handle_delete_cors( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_cors( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -84,14 +91,14 @@ pub async fn handle_put_cors( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_options_s3api( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<IncomingBody>, bucket_name: Option<String>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { // FIXME: CORS rules of buckets with local aliases are // not taken into account. @@ -121,7 +128,7 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*") .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } else { // If there is no bucket name in the request, @@ -131,14 +138,14 @@ pub async fn handle_options_s3api( .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } pub fn handle_options_for_bucket( - req: &Request<Body>, + req: &Request<IncomingBody>, bucket: &Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let origin = req .headers() .get("Origin") @@ -161,7 +168,7 @@ pub fn handle_options_for_bucket( if let Some(rule) = matching_rule { let mut resp = Response::builder() .status(StatusCode::OK) - .body(Body::empty())?; + .body(empty_body())?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; return Ok(resp); } @@ -172,7 +179,7 @@ pub fn handle_options_for_bucket( pub fn find_matching_cors_rule<'a>( bucket: &'a Bucket, - req: &Request<Body>, + req: &Request<impl Body>, ) -> Result<Option<&'a GarageCorsRule>, Error> { if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { if let Some(origin) = req.headers().get("Origin") { @@ -209,7 +216,7 @@ where } pub fn add_cors_headers( - resp: &mut Response<Body>, + resp: &mut Response<ResBody>, rule: &GarageCorsRule, ) -> Result<(), http::header::InvalidHeaderValue> { let h = resp.headers_mut(); diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 685ce004..3fb39147 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,12 +1,15 @@ use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use garage_util::data::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; @@ -59,11 +62,11 @@ pub async fn handle_delete( garage: Arc<Garage>, bucket_id: Uuid, key: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { match handle_delete_internal(&garage, bucket_id, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap()), Err(e) => Err(e), } @@ -72,10 +75,10 @@ pub async fn handle_delete( pub async fn handle_delete_objects( garage: Arc<Garage>, bucket_id: Uuid, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -118,7 +121,7 @@ pub async fn handle_delete_objects( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } struct DeleteRequest { diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index c50cff9f..8afe4954 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -2,13 +2,14 @@ use std::convert::TryInto; use err_derive::Error; use hyper::header::HeaderValue; -use hyper::{Body, HeaderMap, StatusCode}; +use hyper::{HeaderMap, StatusCode}; use garage_model::helper::error::Error as HelperError; use crate::common_error::CommonError; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; use crate::generic_server::ApiError; +use crate::helpers::*; use crate::s3::xml as s3_xml; use crate::signature::error::Error as SignatureError; @@ -189,22 +190,23 @@ impl ApiError for Error { } } - fn http_body(&self, garage_region: &str, path: &str) -> Body { + fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { let error = s3_xml::Error { code: s3_xml::Value(self.aws_code().to_string()), message: s3_xml::Value(format!("{}", self)), resource: Some(s3_xml::Value(path.to_string())), region: Some(s3_xml::Value(garage_region.to_string())), }; - Body::from(s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { + let error_str = s3_xml::to_xml_with_header(&error).unwrap_or_else(|_| { r#" <?xml version="1.0" encoding="UTF-8"?> <Error> - <Code>InternalError</Code> - <Message>XML encoding of error failed</Message> + <Code>InternalError</Code> + <Message>XML encoding of error failed</Message> </Error> - "# + "# .into() - })) + }); + BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) } } diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 5e682726..71f0b158 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -8,7 +8,7 @@ use http::header::{ ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE, IF_NONE_MATCH, LAST_MODIFIED, RANGE, }; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; @@ -20,6 +20,8 @@ use garage_model::garage::Garage; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; @@ -52,8 +54,8 @@ fn object_headers( fn try_answer_cached( version: &ObjectVersion, version_meta: &ObjectVersionMeta, - req: &Request<Body>, -) -> Option<Response<Body>> { + req: &Request<impl Body>, +) -> Option<Response<ResBody>> { // <trinity> It is possible, and is even usually the case, [that both If-None-Match and // If-Modified-Since] are present in a request. In this situation If-None-Match takes // precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational @@ -80,7 +82,7 @@ fn try_answer_cached( Some( Response::builder() .status(StatusCode::NOT_MODIFIED) - .body(Body::empty()) + .body(empty_body()) .unwrap(), ) } else { @@ -91,11 +93,11 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -138,7 +140,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, "1") .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -163,7 +165,7 @@ pub async fn handle_head( ) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } _ => unreachable!(), } @@ -171,18 +173,18 @@ pub async fn handle_head( Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", version_meta.size)) .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } } /// Handle GET request pub async fn handle_get( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<impl Body>, bucket_id: Uuid, key: &str, part_number: Option<u64>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let object = garage .object_table .get(&bucket_id, &key.to_string()) @@ -240,8 +242,7 @@ pub async fn handle_get( match &last_v_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_, bytes) => { - let body: Body = Body::from(bytes.to_vec()); - Ok(resp_builder.body(body)?) + Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { let (tx, rx) = mpsc::channel(2); @@ -293,10 +294,14 @@ pub async fn handle_get( } }); - let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten(); - - let body = hyper::body::Body::wrap_stream(body_stream); - Ok(resp_builder.body(body)?) + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + let body = http_body_util::StreamBody::new(body_stream); + Ok(resp_builder.body(ResBody::new(body))?) } } } @@ -308,7 +313,7 @@ async fn handle_get_range( version_meta: &ObjectVersionMeta, begin: u64, end: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let resp_builder = object_headers(version, version_meta) .header(CONTENT_LENGTH, format!("{}", end - begin)) .header( @@ -321,7 +326,7 @@ async fn handle_get_range( ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { if end as usize <= bytes.len() { - let body: Body = Body::from(bytes[begin as usize..end as usize].to_vec()); + let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); Ok(resp_builder.body(body)?) } else { Err(Error::internal_error( @@ -348,7 +353,7 @@ async fn handle_get_part( version_data: &ObjectVersionData, version_meta: &ObjectVersionMeta, part_number: u64, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let resp_builder = object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); @@ -364,7 +369,7 @@ async fn handle_get_part( format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), ) .header(X_AMZ_MP_PARTS_COUNT, "1") - .body(Body::from(bytes.to_vec()))?) + .body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, _) => { let version = garage @@ -392,7 +397,7 @@ async fn handle_get_part( } fn parse_range_header( - req: &Request<Body>, + req: &Request<impl Body>, total_size: u64, ) -> Result<Option<http_range::HttpRange>, Error> { let range = match req.headers().get(RANGE) { @@ -434,7 +439,7 @@ fn body_from_blocks_range( all_blocks: &[(VersionBlockKey, VersionBlock)], begin: u64, end: u64, -) -> Body { +) -> ResBody { // We will store here the list of blocks that have an intersection with the requested // range, as well as their "true offset", which is their actual offset in the complete // file (whereas block.offset designates the offset of the block WITHIN THE PART @@ -456,59 +461,74 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let body_stream = futures::stream::iter(blocks) - .enumerate() - .map(move |(i, (block, block_offset))| { - let garage = garage.clone(); - async move { - garage - .block_manager - .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) - .await - .unwrap_or_else(|e| error_stream(i, e)) - .scan(block_offset, move |chunk_offset, chunk| { - let r = match chunk { - Ok(chunk_bytes) => { - let chunk_len = chunk_bytes.len() as u64; - let r = if *chunk_offset >= end { - // The current chunk is after the part we want to read. - // Returning None here will stop the scan, the rest of the - // stream will be ignored - None - } else if *chunk_offset + chunk_len <= begin { - // The current chunk is before the part we want to read. - // We return a None that will be removed by the filter_map - // below. - Some(None) - } else { - // The chunk has an intersection with the requested range - let start_in_chunk = if *chunk_offset > begin { - 0 - } else { - begin - *chunk_offset - }; - let end_in_chunk = if *chunk_offset + chunk_len < end { - chunk_len + let mut body_stream = + futures::stream::iter(blocks) + .enumerate() + .map(move |(i, (block, block_offset))| { + let garage = garage.clone(); + async move { + garage + .block_manager + .rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) + .await + .unwrap_or_else(|e| error_stream(i, e)) + .scan(block_offset, move |chunk_offset, chunk| { + let r = match chunk { + Ok(chunk_bytes) => { + let chunk_len = chunk_bytes.len() as u64; + let r = if *chunk_offset >= end { + // The current chunk is after the part we want to read. + // Returning None here will stop the scan, the rest of the + // stream will be ignored + None + } else if *chunk_offset + chunk_len <= begin { + // The current chunk is before the part we want to read. + // We return a None that will be removed by the filter_map + // below. + Some(None) } else { - end - *chunk_offset + // The chunk has an intersection with the requested range + let start_in_chunk = if *chunk_offset > begin { + 0 + } else { + begin - *chunk_offset + }; + let end_in_chunk = if *chunk_offset + chunk_len < end { + chunk_len + } else { + end - *chunk_offset + }; + Some(Some(Ok(chunk_bytes.slice( + start_in_chunk as usize..end_in_chunk as usize, + )))) }; - Some(Some(Ok(chunk_bytes - .slice(start_in_chunk as usize..end_in_chunk as usize)))) - }; - *chunk_offset += chunk_bytes.len() as u64; - r - } - Err(e) => Some(Some(Err(e))), - }; - futures::future::ready(r) - }) - .filter_map(futures::future::ready) - } - }) - .buffered(2) - .flatten(); + *chunk_offset += chunk_bytes.len() as u64; + r + } + Err(e) => Some(Some(Err(e))), + }; + futures::future::ready(r) + }) + .filter_map(futures::future::ready) + } + }); - hyper::body::Body::wrap_stream(body_stream) + let (tx, rx) = mpsc::channel(2); + tokio::spawn(async move { + while let Some(item) = body_stream.next().await { + if tx.send(item.await).await.is_err() { + break; // connection closed by client + } + } + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) + .flatten() + .map(|x| { + x.map(hyper::body::Frame::data) + .map_err(|e| Error::from(garage_util::error::Error::from(e))) + }); + ResBody::new(http_body_util::StreamBody::new(body_stream)) } fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index ae8fbc37..35757e8c 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,10 +1,13 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -16,7 +19,7 @@ use garage_model::bucket_table::{ use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -27,18 +30,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Err Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -48,16 +51,16 @@ pub async fn handle_delete_lifecycle( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_lifecycle( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -77,7 +80,7 @@ pub async fn handle_put_lifecycle( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 1b9e8cd5..b832a4f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,7 +3,7 @@ use std::iter::{Iterator, Peekable}; use std::sync::Arc; use base64::prelude::*; -use hyper::{Body, Response}; +use hyper::Response; use garage_util::data::*; use garage_util::error::Error as GarageError; @@ -16,7 +16,8 @@ use garage_model::s3::object_table::*; use garage_table::EnumerationOrder; use crate::encoding::*; -use crate::helpers::key_after_prefix; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::error::*; use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; @@ -63,7 +64,7 @@ pub struct ListPartsQuery { pub async fn handle_list( garage: Arc<Garage>, query: &ListObjectsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -162,13 +163,13 @@ pub async fn handle_list( let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_multipart_upload( garage: Arc<Garage>, query: &ListMultipartUploadsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -264,13 +265,13 @@ pub async fn handle_list_multipart_upload( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } pub async fn handle_list_parts( garage: Arc<Garage>, query: &ListPartsQuery, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; @@ -319,7 +320,7 @@ pub async fn handle_list_parts( Ok(Response::builder() .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) + .body(string_body(xml))?) } /* diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 96c4d044..4aa27eaf 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::sync::Arc; use futures::{prelude::*, TryStreamExt}; -use hyper::body::Body; -use hyper::{body::HttpBody, Request, Response}; +use http_body_util::BodyStream; +use hyper::{Request, Response}; use md5::{Digest as Md5Digest, Md5}; use garage_table::*; @@ -17,6 +17,8 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::put::*; use crate::s3::xml as s3_xml; @@ -26,11 +28,11 @@ use crate::signature::verify_signed_content; pub async fn handle_create_multipart_upload( garage: Arc<Garage>, - req: &Request<Body>, + req: &Request<ReqBody>, bucket_name: &str, bucket_id: Uuid, key: &String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -65,18 +67,18 @@ pub async fn handle_create_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_put_part( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -87,8 +89,10 @@ pub async fn handle_put_part( // Read first chuck, and at the same time try to get object to see if it exists let key = key.to_string(); - let body = TryStreamExt::map_err(req.into_body(), Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); + let body_stream = BodyStream::new(req.into_body()) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let mut chunker = StreamChunker::new(body_stream, garage.config.block_size); let ((_, _, mut mpu), first_block) = futures::try_join!( get_upload(&garage, &bucket_id, &key, &upload_id), @@ -172,7 +176,7 @@ pub async fn handle_put_part( let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) + .body(empty_body()) .unwrap(); Ok(response) } @@ -210,14 +214,16 @@ impl Drop for InterruptedCleanup { pub async fn handle_complete_multipart_upload( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket_name: &str, bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = HttpBody::collect(req.into_body()).await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = http_body_util::BodyExt::collect(req.into_body()) + .await? + .to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -365,7 +371,7 @@ pub async fn handle_complete_multipart_upload( }; let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::new(Body::from(xml.into_bytes()))) + Ok(Response::new(string_body(xml))) } pub async fn handle_abort_multipart_upload( @@ -373,7 +379,7 @@ pub async fn handle_abort_multipart_upload( bucket_id: Uuid, key: &str, upload_id: &str, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let upload_id = decode_upload_id(upload_id)?; let (_, mut object_version, _) = @@ -383,7 +389,7 @@ pub async fn handle_abort_multipart_upload( let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; - Ok(Response::new(Body::from(vec![]))) + Ok(Response::new(empty_body())) } // ======== helpers ============ diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index f9eccb7f..e9732dc4 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::convert::TryInto; +use std::convert::{Infallible, TryInto}; use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; @@ -7,14 +7,17 @@ use std::task::{Context, Poll}; use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use http_body_util::BodyStream; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use multer::{Constraints, Multipart, SizeLimit}; use serde::Deserialize; use garage_model::garage::Garage; +use crate::helpers::*; +use crate::s3::api_server::ResBody; use crate::s3::cors::*; use crate::s3::error::*; use crate::s3::put::{get_headers, save_stream}; @@ -23,9 +26,9 @@ use crate::signature::payload::{parse_date, verify_v4}; pub async fn handle_post_object( garage: Arc<Garage>, - req: Request<Body>, + req: Request<IncomingBody>, bucket_name: String, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let boundary = req .headers() .get(header::CONTENT_TYPE) @@ -42,7 +45,10 @@ pub async fn handle_post_object( ); let (head, body) = req.into_parts(); - let mut multipart = Multipart::with_constraints(body, boundary, constraints); + let body_stream = BodyStream::new(body) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints); let mut params = HeaderMap::new(); let field = loop { @@ -259,7 +265,7 @@ pub async fn handle_post_object( .status(StatusCode::SEE_OTHER) .header(header::LOCATION, target.clone()) .header(header::ETAG, etag) - .body(target.into())? + .body(string_body(target))? } else { let path = head .uri @@ -290,7 +296,7 @@ pub async fn handle_post_object( .header(header::LOCATION, location.clone()) .header(header::ETAG, etag.clone()); match action { - "200" => builder.status(StatusCode::OK).body(Body::empty())?, + "200" => builder.status(StatusCode::OK).body(empty_body())?, "201" => { let xml = s3_xml::PostObject { xmlns: (), @@ -302,14 +308,16 @@ pub async fn handle_post_object( let body = s3_xml::to_xml_with_header(&xml)?; builder .status(StatusCode::CREATED) - .body(Body::from(body.into_bytes()))? + .body(string_body(body))? } - _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?, + _ => builder.status(StatusCode::NO_CONTENT).body(empty_body())?, } }; - let matching_cors_rule = - find_matching_cors_rule(&bucket, &Request::from_parts(head, Body::empty()))?; + let matching_cors_rule = find_matching_cors_rule( + &bucket, + &Request::from_parts(head, empty_body::<Infallible>()), + )?; if let Some(rule) = matching_cors_rule { add_cors_headers(&mut resp, rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 606facc4..3d43eee8 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; use futures::try_join; -use hyper::body::{Body, Bytes}; +use http_body_util::BodyStream; +use hyper::body::Bytes; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -30,15 +31,17 @@ use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; pub async fn handle_put( garage: Arc<Garage>, - req: Request<Body>, + req: Request<ReqBody>, bucket: &Bucket, key: &String, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { // Retrieve interesting headers from request let headers = get_headers(req.headers())?; debug!("Object headers: {:?}", headers); @@ -48,13 +51,14 @@ pub async fn handle_put( None => None, }; - let (_head, body) = req.into_parts(); - let body = body.map_err(Error::from); + let body_stream = BodyStream::new(req.into_body()) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); save_stream( garage, headers, - body, + body_stream, bucket, key, content_md5, @@ -434,11 +438,11 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } -pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { +pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) .header("ETag", format!("\"{}\"", md5sum_hex)) - .body(Body::from(vec![])) + .body(empty_body()) .unwrap() } diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index f754ff1b..1c1dbf20 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,9 +1,12 @@ use quick_xml::de::from_reader; use std::sync::Arc; -use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; +use http_body_util::BodyExt; +use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; +use crate::helpers::*; +use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; @@ -12,7 +15,7 @@ use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error> { +pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<ResBody>, Error> { let param = bucket .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -33,18 +36,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error Ok(Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "application/xml") - .body(Body::from(xml))?) + .body(string_body(xml))?) } else { Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } } pub async fn handle_delete_website( garage: Arc<Garage>, mut bucket: Bucket, -) -> Result<Response<Body>, Error> { +) -> Result<Response<ResBody>, Error> { let param = bucket .params_mut() .ok_or_internal_error("Bucket should not be deleted at this point")?; @@ -54,16 +57,16 @@ pub async fn handle_delete_website( Ok(Response::builder() .status(StatusCode::NO_CONTENT) - .body(Body::empty())?) + .body(empty_body())?) } pub async fn handle_put_website( garage: Arc<Garage>, mut bucket: Bucket, - req: Request<Body>, + req: Request<ReqBody>, content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = req.into_body().collect().await?.to_bytes(); +) -> Result<Response<ResBody>, Error> { + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; @@ -83,7 +86,7 @@ pub async fn handle_put_website( Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty())?) + .body(empty_body())?) } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs index b50fb3bb..652c606e 100644 --- a/src/api/signature/payload.rs +++ b/src/api/signature/payload.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use hmac::Mac; -use hyper::{Body, Method, Request}; +use hyper::{body::Incoming as IncomingBody, Method, Request}; use sha2::{Digest, Sha256}; use garage_table::*; @@ -20,7 +20,7 @@ use crate::signature::error::*; pub async fn check_payload_signature( garage: &Garage, service: &'static str, - request: &Request<Body>, + request: &Request<IncomingBody>, ) -> Result<(Option<Key>, Option<Hash>), Error> { let mut headers = HashMap::new(); for (key, val) in request.headers() { diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index c8358c4f..8fb0c4ef 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -5,22 +5,26 @@ use futures::prelude::*; use futures::task; use garage_model::key_table::Key; use hmac::Mac; -use hyper::body::Bytes; -use hyper::{Body, Request}; +use http_body_util::{BodyStream, StreamBody}; +use hyper::body::{Bytes, Incoming as IncomingBody}; +use hyper::Request; use garage_util::data::Hash; use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME}; +use crate::helpers::*; use crate::signature::error::*; +pub type ReqBody = BoxBody<Error>; + pub fn parse_streaming_body( api_key: &Key, - req: Request<Body>, + req: Request<IncomingBody>, content_sha256: &mut Option<Hash>, region: &str, service: &str, -) -> Result<Request<Body>, Error> { +) -> Result<Request<ReqBody>, Error> { match req.headers().get("x-amz-content-sha256") { Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { let signature = content_sha256 @@ -47,19 +51,17 @@ pub fn parse_streaming_body( .ok_or_internal_error("Unable to build signing HMAC")?; Ok(req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) + let body_stream = BodyStream::new(body) + .map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap + .map_err(Error::from); + let signed_payload_stream = + SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) + .map(|x| x.map(hyper::body::Frame::data)) + .map_err(Error::from); + ReqBody::new(StreamBody::new(signed_payload_stream)) })) } - _ => Ok(req), + _ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))), } } diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 8eb35c25..3add5200 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -26,8 +26,9 @@ percent-encoding.workspace = true futures.workspace = true http.workspace = true +http-body-util.workspace = true hyper.workspace = true -hyperlocal.workspace = true +hyper-util.workspace = true tokio.workspace = true diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 73780efb..ce438469 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -4,16 +4,17 @@ use std::{convert::Infallible, sync::Arc}; use futures::future::Future; +use hyper::server::conn::http1; use hyper::{ + body::Incoming as IncomingBody, header::{HeaderValue, HOST}, - server::conn::AddrStream, - service::{make_service_fn, service_fn}, - Body, Method, Request, Response, Server, StatusCode, + service::service_fn, + Method, Request, Response, StatusCode, }; +use hyper_util::rt::TokioIo; -use hyperlocal::UnixServerExt; - -use tokio::net::UnixStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, UnixListener}; use opentelemetry::{ global, @@ -24,7 +25,7 @@ use opentelemetry::{ use crate::error::*; -use garage_api::helpers::{authority_to_host, host_to_bucket}; +use garage_api::helpers::*; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, @@ -76,7 +77,7 @@ impl WebServer { /// Run a web server pub async fn run( garage: Arc<Garage>, - addr: UnixOrTCPSocketAddress, + bind_addr: UnixOrTCPSocketAddress, root_domain: String, shutdown_signal: impl Future<Output = ()>, ) -> Result<(), GarageError> { @@ -87,65 +88,73 @@ impl WebServer { root_domain, }); - let tcp_service = make_service_fn(|conn: &AddrStream| { - let web_server = web_server.clone(); - - let client_addr = conn.remote_addr(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let web_server = web_server.clone(); - - web_server.handle_request(req, client_addr.to_string()) - })) - } - }); - - let unix_service = make_service_fn(|_: &UnixStream| { - let web_server = web_server.clone(); + info!("Web server listening on {}", bind_addr); - let path = addr.to_string(); - async move { - Ok::<_, Error>(service_fn(move |req: Request<Body>| { - let web_server = web_server.clone(); + tokio::pin!(shutdown_signal); - web_server.handle_request(req, path.clone()) - })) - } - }); + match bind_addr { + UnixOrTCPSocketAddress::TCPSocket(addr) => { + let listener = TcpListener::bind(addr).await?; - info!("Web server listening on {}", addr); + loop { + let (stream, client_addr) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; - match addr { - UnixOrTCPSocketAddress::TCPSocket(addr) => { - Server::bind(&addr) - .serve(tcp_service) - .with_graceful_shutdown(shutdown_signal) - .await? + web_server.launch_handler(stream, client_addr.to_string()); + } } UnixOrTCPSocketAddress::UnixSocket(ref path) => { if path.exists() { fs::remove_file(path)? } - let bound = Server::bind_unix(path)?; + let listener = UnixListener::bind(path)?; fs::set_permissions(path, Permissions::from_mode(0o222))?; - bound - .serve(unix_service) - .with_graceful_shutdown(shutdown_signal) - .await?; + loop { + let (stream, _) = tokio::select! { + acc = listener.accept() => acc?, + _ = &mut shutdown_signal => break, + }; + + web_server.launch_handler(stream, path.display().to_string()); + } } }; Ok(()) } + fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String) + where + S: AsyncRead + AsyncWrite + Send + Sync + 'static, + { + let this = self.clone(); + let io = TokioIo::new(stream); + + let serve = move |req: Request<IncomingBody>| { + this.clone().handle_request(req, client_addr.to_string()) + }; + + tokio::task::spawn(async move { + let io = Box::pin(io); + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(serve)) + .await + { + debug!("Error handling HTTP connection: {}", e); + } + }); + } + async fn handle_request( self: Arc<Self>, - req: Request<Body>, + req: Request<IncomingBody>, addr: String, - ) -> Result<Response<Body>, Infallible> { + ) -> Result<Response<BoxBody<Error>>, Infallible> { if let Ok(forwarded_for_ip_addr) = forwarded_headers::handle_forwarded_for_headers(req.headers()) { @@ -187,7 +196,8 @@ impl WebServer { match res { Ok(res) => { debug!("{} {} {}", req.method(), res.status(), req.uri()); - Ok(res) + Ok(res + .map(|body| BoxBody::new(http_body_util::BodyExt::map_err(body, Error::from)))) } Err(error) => { info!( @@ -220,7 +230,10 @@ impl WebServer { Ok(exists) } - async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> { + async fn serve_file( + self: &Arc<Self>, + req: &Request<IncomingBody>, + ) -> Result<Response<BoxBody<ApiError>>, Error> { // Get http authority string (eg. [::1]:3902 or garage.tld:80) let authority = req .headers() @@ -268,8 +281,8 @@ impl WebServer { let ret_doc = match *req.method() { Method::OPTIONS => handle_options_for_bucket(req, &bucket), - Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await, - Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await, + Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, + Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await, _ => Err(ApiError::bad_request("HTTP method not supported")), }; @@ -281,7 +294,7 @@ impl WebServer { Ok(Response::builder() .status(StatusCode::FOUND) .header("Location", url) - .body(Body::empty()) + .body(empty_body()) .unwrap()) } _ => ret_doc, @@ -310,7 +323,7 @@ impl WebServer { // Create a fake HTTP request with path = the error document let req2 = Request::builder() .uri(format!("http://{}/{}", host, &error_document)) - .body(Body::empty()) + .body(empty_body::<Infallible>()) .unwrap(); match handle_get(self.garage.clone(), &req2, bucket_id, &error_document, None).await @@ -358,7 +371,7 @@ impl WebServer { } } -fn error_to_res(e: Error) -> Response<Body> { +fn error_to_res(e: Error) -> Response<BoxBody<Error>> { // If we are here, it is either that: // - there was an error before trying to get the requested URL // from the bucket (e.g. bucket not found) @@ -366,7 +379,7 @@ fn error_to_res(e: Error) -> Response<Body> { // was a HEAD request or we couldn't get the error document) // We do NOT enter this code path when returning the bucket's // error document (this is handled in serve_file) - let body = Body::from(format!("{}\n", e)); + let body = string_body(format!("{}\n", e)); let mut http_error = Response::new(body); *http_error.status_mut() = e.http_status_code(); e.add_headers(http_error.headers_mut()); |