aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock643
-rw-r--r--src/garage/Cargo.toml4
-rw-r--r--src/garage/admin_rpc.rs80
-rw-r--r--src/garage/cli.rs168
-rw-r--r--src/garage/main.rs71
-rw-r--r--src/garage/server.rs87
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/block.rs109
-rw-r--r--src/model/garage.rs43
-rw-r--r--src/rpc/Cargo.toml12
-rw-r--r--src/rpc/lib.rs8
-rw-r--r--src/rpc/membership.rs722
-rw-r--r--src/rpc/ring.rs11
-rw-r--r--src/rpc/rpc_client.rs369
-rw-r--r--src/rpc/rpc_helper.rs206
-rw-r--r--src/rpc/rpc_server.rs247
-rw-r--r--src/rpc/system.rs363
-rw-r--r--src/rpc/tls_util.rs140
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/data.rs2
-rw-r--r--src/table/gc.rs80
-rw-r--r--src/table/replication/fullcopy.rs13
-rw-r--r--src/table/replication/parameters.rs6
-rw-r--r--src/table/replication/sharded.rs7
-rw-r--r--src/table/sync.rs97
-rw-r--r--src/table/table.rs110
-rw-r--r--src/util/Cargo.toml3
-rw-r--r--src/util/config.rs63
-rw-r--r--src/util/error.rs11
29 files changed, 1471 insertions, 2209 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c2bec83e..4ea61847 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4,18 +4,18 @@ version = 3
[[package]]
name = "aho-corasick"
-version = "0.7.15"
+version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5"
+checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
dependencies = [
"memchr",
]
[[package]]
name = "arc-swap"
-version = "1.2.0"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73"
+checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63"
[[package]]
name = "arrayvec"
@@ -24,6 +24,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
+name = "async-trait"
+version = "0.1.51"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -48,15 +59,15 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
-version = "1.2.1"
+version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
+checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake2"
-version = "0.9.1"
+version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10a5720225ef5daecf08657f23791354e1685a8c91a4c60c7f3d3b2892f978f4"
+checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
dependencies = [
"crypto-mac 0.8.0",
"digest",
@@ -74,9 +85,9 @@ dependencies = [
[[package]]
name = "bumpalo"
-version = "3.6.1"
+version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
+checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538"
[[package]]
name = "byteorder"
@@ -86,15 +97,21 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
-version = "1.0.1"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
+checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
+
+[[package]]
+name = "bytes"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cc"
-version = "1.0.67"
+version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
+checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
[[package]]
name = "cfg-if"
@@ -127,10 +144,13 @@ dependencies = [
]
[[package]]
-name = "cpuid-bool"
-version = "0.1.2"
+name = "cpufeatures"
+version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
+checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"
+dependencies = [
+ "libc",
+]
[[package]]
name = "crc32fast"
@@ -143,9 +163,9 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
-version = "0.9.3"
+version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2584f639eb95fea8c798496315b297cf81b9b58b6d30ab066a75455333cf4b12"
+checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"crossbeam-utils",
@@ -156,11 +176,10 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
-version = "0.8.3"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7e9d99fa91428effe99c5c6d4634cdeba32b8cf784fc428a2a687f61a952c49"
+checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
- "autocfg",
"cfg-if",
"lazy_static",
]
@@ -177,9 +196,9 @@ dependencies = [
[[package]]
name = "crypto-mac"
-version = "0.10.0"
+version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
+checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a"
dependencies = [
"generic-array",
"subtle",
@@ -209,6 +228,20 @@ dependencies = [
[[package]]
name = "err-derive"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22deed3a8124cff5fa835713fa105621e43bbdc46690c3a6b68328a012d350d4"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn",
+ "synstructure",
+]
+
+[[package]]
+name = "err-derive"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcc7f65832b62ed38939f98966824eb6294911c3629b0e9a262bfb80836d9686"
@@ -249,9 +282,9 @@ dependencies = [
[[package]]
name = "futures"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1"
+checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [
"futures-channel",
"futures-core",
@@ -264,9 +297,9 @@ dependencies = [
[[package]]
name = "futures-channel"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939"
+checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888"
dependencies = [
"futures-core",
"futures-sink",
@@ -274,15 +307,15 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94"
+checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]]
name = "futures-executor"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1"
+checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
dependencies = [
"futures-core",
"futures-task",
@@ -291,16 +324,17 @@ dependencies = [
[[package]]
name = "futures-io"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59"
+checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]]
name = "futures-macro"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7"
+checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
dependencies = [
+ "autocfg",
"proc-macro-hack",
"proc-macro2",
"quote",
@@ -309,22 +343,23 @@ dependencies = [
[[package]]
name = "futures-sink"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3"
+checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11"
[[package]]
name = "futures-task"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80"
+checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99"
[[package]]
name = "futures-util"
-version = "0.3.13"
+version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1"
+checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [
+ "autocfg",
"futures-channel",
"futures-core",
"futures-io",
@@ -352,7 +387,8 @@ dependencies = [
name = "garage"
version = "0.3.0"
dependencies = [
- "bytes",
+ "async-trait",
+ "bytes 1.1.0",
"futures",
"futures-util",
"garage_api",
@@ -363,10 +399,12 @@ dependencies = [
"garage_web",
"git-version",
"hex",
+ "kuska-sodiumoxide",
"log",
+ "netapp",
"pretty_env_logger",
"rand",
- "rmp-serde",
+ "rmp-serde 0.15.5",
"serde",
"sled",
"structopt",
@@ -379,10 +417,10 @@ name = "garage_api"
version = "0.3.0"
dependencies = [
"base64",
- "bytes",
+ "bytes 1.1.0",
"chrono",
- "crypto-mac 0.10.0",
- "err-derive",
+ "crypto-mac 0.10.1",
+ "err-derive 0.3.0",
"futures",
"futures-util",
"garage_model",
@@ -392,7 +430,7 @@ dependencies = [
"hmac",
"http",
"http-range",
- "httpdate",
+ "httpdate 0.3.2",
"hyper",
"log",
"md-5",
@@ -410,6 +448,7 @@ name = "garage_model"
version = "0.3.0"
dependencies = [
"arc-swap",
+ "async-trait",
"futures",
"futures-util",
"garage_rpc 0.3.0",
@@ -417,8 +456,9 @@ dependencies = [
"garage_util 0.3.0",
"hex",
"log",
+ "netapp",
"rand",
- "rmp-serde",
+ "rmp-serde 0.15.5",
"serde",
"serde_bytes",
"sled",
@@ -432,7 +472,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c182633cebe4abed9594afb14770fc45402513765d38a4b19659ae0ccb2a2f"
dependencies = [
"arc-swap",
- "bytes",
+ "bytes 1.1.0",
"futures",
"futures-util",
"garage_util 0.2.1",
@@ -442,7 +482,7 @@ dependencies = [
"hyper",
"hyper-rustls",
"log",
- "rmp-serde",
+ "rmp-serde 0.15.5",
"rustls",
"serde",
"serde_json",
@@ -457,32 +497,32 @@ name = "garage_rpc"
version = "0.3.0"
dependencies = [
"arc-swap",
- "bytes",
+ "async-trait",
+ "bytes 1.1.0",
"futures",
"futures-util",
"garage_rpc 0.2.1",
"garage_util 0.3.0",
"gethostname",
"hex",
- "http",
"hyper",
- "hyper-rustls",
+ "kuska-sodiumoxide",
"log",
- "rmp-serde",
- "rustls",
+ "netapp",
+ "rand",
+ "rmp-serde 0.15.5",
"serde",
"serde_json",
"tokio",
- "tokio-rustls",
"tokio-stream",
- "webpki",
]
[[package]]
name = "garage_table"
version = "0.3.0"
dependencies = [
- "bytes",
+ "async-trait",
+ "bytes 1.1.0",
"futures",
"futures-util",
"garage_rpc 0.3.0",
@@ -490,7 +530,7 @@ dependencies = [
"hexdump",
"log",
"rand",
- "rmp-serde",
+ "rmp-serde 0.15.5",
"serde",
"serde_bytes",
"sled",
@@ -505,14 +545,14 @@ checksum = "aef76d3779e406a16fdcaffe8d86b8ae2943a549d2b33f2c20930838764464c0"
dependencies = [
"blake2",
"chrono",
- "err-derive",
+ "err-derive 0.3.0",
"futures",
"hex",
"http",
"hyper",
"log",
"rand",
- "rmp-serde",
+ "rmp-serde 0.15.5",
"rustls",
"serde",
"serde_json",
@@ -530,22 +570,21 @@ version = "0.3.0"
dependencies = [
"blake2",
"chrono",
- "err-derive",
+ "err-derive 0.3.0",
"futures",
"hex",
"http",
"hyper",
"log",
+ "netapp",
"rand",
- "rmp-serde",
- "rustls",
+ "rmp-serde 0.15.5",
"serde",
"serde_json",
"sha2",
"sled",
"tokio",
"toml",
- "webpki",
"xxhash-rust",
]
@@ -553,7 +592,7 @@ dependencies = [
name = "garage_web"
version = "0.3.0"
dependencies = [
- "err-derive",
+ "err-derive 0.3.0",
"futures",
"garage_api",
"garage_model",
@@ -588,9 +627,9 @@ dependencies = [
[[package]]
name = "getrandom"
-version = "0.2.2"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8"
+checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if",
"libc",
@@ -599,9 +638,9 @@ dependencies = [
[[package]]
name = "git-version"
-version = "0.3.4"
+version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "94918e83f1e01dedc2e361d00ce9487b14c58c7f40bab148026fa39d42cb41e2"
+checksum = "f6b0decc02f4636b9ccad390dcbe77b722a77efedfa393caf8379a51d5c61899"
dependencies = [
"git-version-macro",
"proc-macro-hack",
@@ -609,9 +648,9 @@ dependencies = [
[[package]]
name = "git-version-macro"
-version = "0.3.4"
+version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "34a97a52fdee1870a34fa6e4b77570cba531b27d1838874fef4429a791a3d657"
+checksum = "fe69f1cbdb6e28af2bac214e943b99ce8a0a06b447d15d3e61161b0423139f3f"
dependencies = [
"proc-macro-hack",
"proc-macro2",
@@ -621,11 +660,11 @@ dependencies = [
[[package]]
name = "h2"
-version = "0.3.1"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d832b01df74254fe364568d6ddc294443f61cbec82816b60904303af87efae78"
+checksum = "6c06815895acec637cd6ed6e9662c935b866d20a106f8361892893a7d9234964"
dependencies = [
- "bytes",
+ "bytes 1.1.0",
"fnv",
"futures-core",
"futures-sink",
@@ -640,24 +679,24 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.9.1"
+version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
+checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
-version = "0.3.2"
+version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "87cbf45460356b7deeb5e3415b5563308c0a9b057c85e12b06ad551f98d0a6ac"
+checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
-version = "0.1.18"
+version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c"
+checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
@@ -684,29 +723,30 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
dependencies = [
- "crypto-mac 0.10.0",
+ "crypto-mac 0.10.1",
"digest",
]
[[package]]
name = "http"
-version = "0.2.3"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7245cd7449cc792608c3c8a9eaf69bd4eabbabf802713748fd739c98b82f0747"
+checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b"
dependencies = [
- "bytes",
+ "bytes 1.1.0",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
-version = "0.4.0"
+version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994"
+checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
dependencies = [
- "bytes",
+ "bytes 1.1.0",
"http",
+ "pin-project-lite",
]
[[package]]
@@ -717,9 +757,9 @@ checksum = "eee9694f83d9b7c09682fdb32213682939507884e5bcf227be9aff5d644b90dc"
[[package]]
name = "httparse"
-version = "1.3.5"
+version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "615caabe2c3160b313d52ccc905335f4ed5f10881dd63dc5699d47e90be85691"
+checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503"
[[package]]
name = "httpdate"
@@ -728,6 +768,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
+name = "httpdate"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"
+
+[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -738,11 +784,11 @@ dependencies = [
[[package]]
name = "hyper"
-version = "0.14.4"
+version = "0.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e8e946c2b1349055e0b72ae281b238baf1a3ea7307c7e9f9d64673bdd9c26ac7"
+checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593"
dependencies = [
- "bytes",
+ "bytes 1.1.0",
"futures-channel",
"futures-core",
"futures-util",
@@ -750,9 +796,9 @@ dependencies = [
"http",
"http-body",
"httparse",
- "httpdate",
+ "httpdate 1.0.1",
"itoa",
- "pin-project",
+ "pin-project-lite",
"socket2",
"tokio",
"tower-service",
@@ -777,9 +823,9 @@ dependencies = [
[[package]]
name = "idna"
-version = "0.2.2"
+version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21"
+checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
@@ -788,9 +834,9 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "1.6.2"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3"
+checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
@@ -798,9 +844,9 @@ dependencies = [
[[package]]
name = "instant"
-version = "0.1.9"
+version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
+checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd"
dependencies = [
"cfg-if",
]
@@ -813,20 +859,43 @@ checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f"
[[package]]
name = "itoa"
-version = "0.4.7"
+version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
+checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "js-sys"
-version = "0.3.49"
+version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dc15e39392125075f60c95ba416f5381ff6c3a948ff02ab12464715adf56c821"
+checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [
"wasm-bindgen",
]
[[package]]
+name = "kuska-handshake"
+version = "0.2.0"
+source = "git+https://github.com/Alexis211/handshake?branch=tokio1.0#61bf144643b177797b2d16b9b2ffcfb648face00"
+dependencies = [
+ "futures",
+ "hex",
+ "kuska-sodiumoxide",
+ "log",
+ "thiserror",
+]
+
+[[package]]
+name = "kuska-sodiumoxide"
+version = "0.2.5-0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ae0f8eafdd240b722243787b51fdaf8df6693fb8621d0f7061cdba574214cf88"
+dependencies = [
+ "libc",
+ "libsodium-sys",
+ "serde",
+]
+
+[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -834,15 +903,27 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
-version = "0.2.90"
+version = "0.2.103"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
+
+[[package]]
+name = "libsodium-sys"
+version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba4aede83fc3617411dc6993bc8c70919750c1c257c6ca6a502aed6e0e2394ae"
+checksum = "6b779387cd56adfbc02ea4a668e704f729be8d6a6abd2c27ca5ee537849a92fd"
+dependencies = [
+ "cc",
+ "libc",
+ "pkg-config",
+ "walkdir",
+]
[[package]]
name = "lock_api"
-version = "0.4.2"
+version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
+checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
dependencies = [
"scopeguard",
]
@@ -858,9 +939,9 @@ dependencies = [
[[package]]
name = "matches"
-version = "0.1.8"
+version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
+checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "md-5"
@@ -875,24 +956,24 @@ dependencies = [
[[package]]
name = "memchr"
-version = "2.3.4"
+version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
+checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
-version = "0.6.1"
+version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "157b4208e3059a8f9e78d559edc658e13df41410cb3ae03979c83130067fdd87"
+checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "mio"
-version = "0.7.10"
+version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2182a122f3b7f3f5329cb1972cee089ba2459a0a80a56935e6e674f096f8d839"
+checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16"
dependencies = [
"libc",
"log",
@@ -903,15 +984,35 @@ dependencies = [
[[package]]
name = "miow"
-version = "0.3.6"
+version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
+checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
dependencies = [
- "socket2",
"winapi",
]
[[package]]
+name = "netapp"
+version = "0.3.0"
+source = "git+https://git.deuxfleurs.fr/lx/netapp#cfa64bc745969cfc3684a70b45d71128f8335460"
+dependencies = [
+ "arc-swap",
+ "async-trait",
+ "bytes 0.6.0",
+ "err-derive 0.2.4",
+ "futures",
+ "hex",
+ "kuska-handshake",
+ "kuska-sodiumoxide",
+ "log",
+ "rmp-serde 0.14.4",
+ "serde",
+ "tokio",
+ "tokio-stream",
+ "tokio-util",
+]
+
+[[package]]
name = "ntapi"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -951,9 +1052,9 @@ dependencies = [
[[package]]
name = "once_cell"
-version = "1.7.2"
+version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
+checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "opaque-debug"
@@ -963,9 +1064,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "parking_lot"
-version = "0.11.1"
+version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d7744ac029df22dca6284efe4e898991d28e3085c706c972bcd7da4a27a15eb"
+checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
@@ -974,9 +1075,9 @@ dependencies = [
[[package]]
name = "parking_lot_core"
-version = "0.8.3"
+version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
+checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if",
"instant",
@@ -993,30 +1094,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
-name = "pin-project"
-version = "1.0.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63"
-dependencies = [
- "pin-project-internal",
-]
-
-[[package]]
-name = "pin-project-internal"
-version = "1.0.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "758669ae3558c6f74bd2a18b41f7ac0b5a195aea6639d6a9b5e5d1ad5ba24c0b"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
name = "pin-project-lite"
-version = "0.2.6"
+version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dc0e1f259c92177c30a4c9d177246edd0a3568b25756a977d0632cf8fa37e905"
+checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
[[package]]
name = "pin-utils"
@@ -1025,10 +1106,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
+name = "pkg-config"
+version = "0.3.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
+
+[[package]]
name = "ppv-lite86"
-version = "0.2.10"
+version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
+checksum = "c3ca011bd0129ff4ae15cd04c4eef202cadf6c51c21e47aba319b4e0501db741"
[[package]]
name = "pretty_env_logger"
@@ -1078,9 +1165,9 @@ checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
-version = "1.0.24"
+version = "1.0.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
+checksum = "edc3358ebc67bc8b7fa0c007f945b0b18226f78437d61bec735a9eb96b61ee70"
dependencies = [
"unicode-xid",
]
@@ -1103,18 +1190,18 @@ dependencies = [
[[package]]
name = "quote"
-version = "1.0.9"
+version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
+checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
-version = "0.8.3"
+version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
+checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
dependencies = [
"libc",
"rand_chacha",
@@ -1124,9 +1211,9 @@ dependencies = [
[[package]]
name = "rand_chacha"
-version = "0.3.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
+checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
@@ -1134,36 +1221,36 @@ dependencies = [
[[package]]
name = "rand_core"
-version = "0.6.2"
+version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
+checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
-version = "0.3.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
+checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
-version = "0.2.5"
+version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9"
+checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
-version = "1.4.5"
+version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "957056ecddbeba1b26965114e191d2e8589ce74db242b6ea25fc4062427a5c19"
+checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"aho-corasick",
"memchr",
@@ -1172,9 +1259,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
-version = "0.6.23"
+version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "24d5f089152e60f62d28b835fbff2cd2e8dc0baf1ac13343bef92ab7eed84548"
+checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "ring"
@@ -1203,9 +1290,20 @@ dependencies = [
[[package]]
name = "rmp-serde"
-version = "0.15.4"
+version = "0.14.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ce7d70c926fe472aed493b902010bccc17fa9f7284145cb8772fd22fdb052d8"
+dependencies = [
+ "byteorder",
+ "rmp",
+ "serde",
+]
+
+[[package]]
+name = "rmp-serde"
+version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "839395ef53057db96b84c9238ab29e1a13f2e5c8ec9f66bef853ab4197303924"
+checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d"
dependencies = [
"byteorder",
"rmp",
@@ -1214,18 +1312,18 @@ dependencies = [
[[package]]
name = "roxmltree"
-version = "0.14.0"
+version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bf58a7d05b28e14b1e8902fa04c4d5d6109f5450ef71a5e6597f66e53f541504"
+checksum = "921904a62e410e37e215c40381b7117f830d9d89ba60ab5236170541dd25646b"
dependencies = [
"xmlparser",
]
[[package]]
name = "rustls"
-version = "0.19.0"
+version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b"
+checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
@@ -1236,9 +1334,9 @@ dependencies = [
[[package]]
name = "rustversion"
-version = "1.0.4"
+version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
+checksum = "61b3909d758bb75c79f23d4736fac9433868679d3ad2ea7a61e3c25cfda9a088"
[[package]]
name = "ryu"
@@ -1247,6 +1345,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
+name = "same-file"
+version = "1.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
+dependencies = [
+ "winapi-util",
+]
+
+[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1254,9 +1361,9 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
-version = "0.6.0"
+version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c"
+checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
@@ -1264,9 +1371,9 @@ dependencies = [
[[package]]
name = "serde"
-version = "1.0.124"
+version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f"
+checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
dependencies = [
"serde_derive",
]
@@ -1282,9 +1389,9 @@ dependencies = [
[[package]]
name = "serde_derive"
-version = "1.0.124"
+version = "1.0.130"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b"
+checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
dependencies = [
"proc-macro2",
"quote",
@@ -1293,9 +1400,9 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.64"
+version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79"
+checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
dependencies = [
"itoa",
"ryu",
@@ -1304,37 +1411,37 @@ dependencies = [
[[package]]
name = "sha2"
-version = "0.9.3"
+version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
+checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
dependencies = [
"block-buffer",
"cfg-if",
- "cpuid-bool",
+ "cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
name = "signal-hook-registry"
-version = "1.3.0"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
+checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
dependencies = [
"libc",
]
[[package]]
name = "slab"
-version = "0.4.2"
+version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
+checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "sled"
-version = "0.34.6"
+version = "0.34.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1d0132f3e393bcb7390c60bb45769498cf4550bcb7a21d7f95c02b69f6362cdc"
+checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935"
dependencies = [
"crc32fast",
"crossbeam-epoch",
@@ -1348,17 +1455,16 @@ dependencies = [
[[package]]
name = "smallvec"
-version = "1.6.1"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
+checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "socket2"
-version = "0.3.19"
+version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
+checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
dependencies = [
- "cfg-if",
"libc",
"winapi",
]
@@ -1371,9 +1477,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "structopt"
-version = "0.3.21"
+version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5277acd7ee46e63e5168a80734c9f6ee81b1367a7d8772a2d765df2a3705d28c"
+checksum = "bf9d950ef167e25e0bdb073cf1d68e9ad2795ac826f2f3f59647817cf23c0bfa"
dependencies = [
"clap",
"lazy_static",
@@ -1382,9 +1488,9 @@ dependencies = [
[[package]]
name = "structopt-derive"
-version = "0.4.14"
+version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ba9cdfda491b814720b6b06e0cac513d922fc407582032e8706e9f137976f90"
+checksum = "134d838a2c9943ac3125cf6df165eda53493451b719f3255b2a26b85f772d0ba"
dependencies = [
"heck",
"proc-macro-error",
@@ -1395,15 +1501,15 @@ dependencies = [
[[package]]
name = "subtle"
-version = "2.4.0"
+version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
+checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
-version = "1.0.64"
+version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3fd9d1e9976102a03c542daa2eff1b43f9d72306342f3f8b3ed5fb8908195d6f"
+checksum = "d010a1623fbd906d51d650a9916aaefc05ffa0e4053ff7fe601167f3e715d194"
dependencies = [
"proc-macro2",
"quote",
@@ -1412,9 +1518,9 @@ dependencies = [
[[package]]
name = "synstructure"
-version = "0.12.4"
+version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
+checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
@@ -1441,6 +1547,26 @@ dependencies = [
]
[[package]]
+name = "thiserror"
+version = "1.0.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417"
+dependencies = [
+ "thiserror-impl",
+]
+
+[[package]]
+name = "thiserror-impl"
+version = "1.0.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "time"
version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1452,9 +1578,9 @@ dependencies = [
[[package]]
name = "tinyvec"
-version = "1.1.1"
+version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "317cca572a0e89c3ce0ca1f1bdc9369547fe318a683418e42ac8f59d14701023"
+checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7"
dependencies = [
"tinyvec_macros",
]
@@ -1467,12 +1593,12 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
-version = "1.3.0"
+version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda"
+checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc"
dependencies = [
"autocfg",
- "bytes",
+ "bytes 1.1.0",
"libc",
"memchr",
"mio",
@@ -1486,9 +1612,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
-version = "1.1.0"
+version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57"
+checksum = "b2dd85aeaba7b68df939bd357c6afb36c87951be9e80bf9c859f2fc3e9fca0fd"
dependencies = [
"proc-macro2",
"quote",
@@ -1508,9 +1634,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
-version = "0.1.4"
+version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469"
+checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f"
dependencies = [
"futures-core",
"pin-project-lite",
@@ -1519,12 +1645,13 @@ dependencies = [
[[package]]
name = "tokio-util"
-version = "0.6.4"
+version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29"
+checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd"
dependencies = [
- "bytes",
+ "bytes 1.1.0",
"futures-core",
+ "futures-io",
"futures-sink",
"log",
"pin-project-lite",
@@ -1548,9 +1675,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
-version = "0.1.25"
+version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "01ebdc2bb4498ab1ab5f5b73c5803825e60199229ccba0698170e3be0e7f959f"
+checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"pin-project-lite",
@@ -1559,9 +1686,9 @@ dependencies = [
[[package]]
name = "tracing-core"
-version = "0.1.17"
+version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f"
+checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
@@ -1574,45 +1701,42 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
-version = "1.13.0"
+version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
+checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
[[package]]
name = "unicode-bidi"
-version = "0.3.4"
+version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
-dependencies = [
- "matches",
-]
+checksum = "1a01404663e3db436ed2746d9fefef640d868edae3cceb81c3b8d5732fda678f"
[[package]]
name = "unicode-normalization"
-version = "0.1.17"
+version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "07fbfce1c8a97d547e8b5334978438d9d6ec8c20e38f56d4a4374d181493eaef"
+checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
-version = "1.7.1"
+version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
+checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
-version = "0.1.8"
+version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
+checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
[[package]]
name = "unicode-xid"
-version = "0.2.1"
+version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
+checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "untrusted"
@@ -1622,9 +1746,9 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
-version = "2.2.1"
+version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b"
+checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
dependencies = [
"form_urlencoded",
"idna",
@@ -1639,6 +1763,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
+name = "walkdir"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
+dependencies = [
+ "same-file",
+ "winapi",
+ "winapi-util",
+]
+
+[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1656,9 +1791,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
-version = "0.2.72"
+version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8fe8f61dba8e5d645a4d8132dc7a0a66861ed5e1045d2c0ed940fab33bac0fbe"
+checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@@ -1666,9 +1801,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
-version = "0.2.72"
+version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "046ceba58ff062da072c7cb4ba5b22a37f00a302483f7e2a6cdc18fedbdc1fd3"
+checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
@@ -1681,9 +1816,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.72"
+version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ef9aa01d36cda046f797c57959ff5f3c615c9cc63997a8d545831ec7976819b"
+checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -1691,9 +1826,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.72"
+version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "96eb45c1b2ee33545a813a92dbb53856418bf7eb54ab34f7f7ff1448a5b3735d"
+checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2",
"quote",
@@ -1704,15 +1839,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.72"
+version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7148f4696fb4960a346eaa60bbfb42a1ac4ebba21f750f75fc1375b098d5ffa"
+checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "web-sys"
-version = "0.3.49"
+version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59fe19d70f5dacc03f6e46777213facae5ac3801575d56ca6cbd4c93dcd12310"
+checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [
"js-sys",
"wasm-bindgen",
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 09ed3e1e..032d1cf6 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -27,6 +27,8 @@ hex = "0.4"
log = "0.4"
pretty_env_logger = "0.4"
rand = "0.8"
+async-trait = "0.1.7"
+sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
sled = "0.34"
@@ -38,3 +40,5 @@ toml = "0.5"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index fe5a9d88..b9e57c40 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::error::Error;
@@ -10,8 +11,7 @@ use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
@@ -19,10 +19,8 @@ use garage_model::key_table::*;
use crate::cli::*;
use crate::repair::Repair;
-use crate::*;
-pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub const ADMIN_RPC_PATH: &str = "_admin";
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpc {
@@ -33,41 +31,31 @@ pub enum AdminRpc {
// Replies
Ok(String),
+ Error(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
}
-impl RpcMessage for AdminRpc {}
+impl Message for AdminRpc {
+ type Response = AdminRpc;
+}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
- rpc_client: Arc<RpcClient<AdminRpc>>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
- let rpc_client = garage.system.clone().rpc_client::<AdminRpc>(ADMIN_RPC_PATH);
- Arc::new(Self { garage, rpc_client })
- }
-
- pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
- rpc_server.add_handler::<AdminRpc, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
- let self2 = self.clone();
- async move {
- match msg {
- AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
- AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
- AdminRpc::Stats(opt) => self2.handle_stats(opt).await,
- _ => Err(Error::BadRpc("Invalid RPC".to_string())),
- }
- }
- });
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self { garage, endpoint });
+ admin.endpoint.set_handler(admin.clone());
+ admin
}
- async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRpc, Error> {
+ async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => {
let bucket_names = self
@@ -187,7 +175,7 @@ impl AdminRpcHandler {
}
}
- async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRpc, Error> {
+ async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => {
let key_ids = self
@@ -210,13 +198,13 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::New(query) => {
- let key = Key::new(query.name);
+ let key = Key::new(query.name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
KeyOperation::Rename(query) => {
let mut key = self.get_existing_key(&query.key_pattern).await?;
- key.name.update(query.new_name);
+ key.name.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
Ok(AdminRpc::KeyInfo(key))
}
@@ -353,17 +341,18 @@ impl AdminRpcHandler {
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
if self
- .rpc_client
+ .endpoint
.call(
- *node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- ADMIN_RPC_TIMEOUT,
+ &node,
+ &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
)
.await
.is_err()
{
- failures.push(*node);
+ failures.push(node);
}
}
if failures.is_empty() {
@@ -397,14 +386,16 @@ impl AdminRpcHandler {
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
+ let node = NodeID::from_slice(node.as_slice()).unwrap();
+
let mut opt = opt.clone();
opt.all_nodes = false;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
match self
- .rpc_client
- .call(*node, AdminRpc::Stats(opt), ADMIN_RPC_TIMEOUT)
+ .endpoint
+ .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
@@ -495,4 +486,23 @@ impl AdminRpcHandler {
.unwrap();
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
}
+
+ async fn handle_rpc(self: &Arc<Self>, msg: &AdminRpc) -> Result<AdminRpc, Error> {
+ match msg {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ _ => Err(Error::BadRpc("Invalid RPC".to_string())),
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(self: &Arc<Self>, message: &AdminRpc, _from: NodeID) -> AdminRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e)))
+ }
}
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index f9e67fac..91ec5ab2 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -1,6 +1,5 @@
-use std::cmp::max;
-use std::collections::HashSet;
-use std::net::SocketAddr;
+//use std::cmp::max;
+//use std::collections::HashSet;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
@@ -8,11 +7,11 @@ use structopt::StructOpt;
use garage_util::data::Uuid;
use garage_util::error::Error;
-use garage_util::time::*;
+//use garage_util::time::*;
-use garage_rpc::membership::*;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
@@ -298,54 +297,65 @@ pub struct StatsOpt {
pub async fn cli_cmd(
cmd: Command,
- membership_rpc_cli: RpcAddrClient<Message>,
- admin_rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
+ admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- Command::Status => cmd_status(membership_rpc_cli, rpc_host).await,
+ Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await,
Command::Node(NodeOperation::Configure(configure_opt)) => {
- cmd_configure(membership_rpc_cli, rpc_host, configure_opt).await
+ cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await
}
Command::Node(NodeOperation::Remove(remove_opt)) => {
- cmd_remove(membership_rpc_cli, rpc_host, remove_opt).await
+ cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await
}
Command::Bucket(bo) => {
- cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::BucketOperation(bo)).await
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
- Command::Key(ko) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::KeyOperation(ko)).await,
- Command::Repair(ro) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::LaunchRepair(ro)).await,
- Command::Stats(so) => cmd_admin(admin_rpc_cli, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Key(ko) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
+ }
+ Command::Repair(ro) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
+ }
+ Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
_ => unreachable!(),
}
}
-pub async fn cmd_status(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
-) -> Result<(), Error> {
+pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
+ println!("STATUS:");
+ for node in status {
+ println!("{:?}", node);
+ }
+ println!("CONFIG: (v{})", config.version);
+ for (id, node) in config.members {
+ println!("{} {:?}", hex::encode(id.as_slice()), node);
+ }
+
+ /* TODO
let (hostname_len, addr_len, tag_len, zone_len) = status
.iter()
- .map(|adv| (adv, config.members.get(&adv.id)))
- .map(|(adv, cfg)| {
+ .map(|(id, addr, _)| (addr, config.members.get(&adv.id)))
+ .map(|(addr, cfg)| {
(
- adv.state_info.hostname.len(),
- adv.addr.to_string().len(),
+ 8,
+ addr.to_string().len(),
cfg.map(|c| c.tag.len()).unwrap_or(0),
cfg.map(|c| c.zone.len()).unwrap_or(0),
)
@@ -355,13 +365,13 @@ pub async fn cmd_status(
});
println!("Healthy nodes:");
- for adv in status.iter().filter(|x| x.is_up) {
+ for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}",
- id = adv.id,
- host = adv.state_info.hostname,
- addr = adv.addr,
+ id = id,
+ host = "",
+ addr = addr,
tag = cfg.tag,
zone = cfg.zone,
capacity = cfg.capacity_string(),
@@ -373,36 +383,36 @@ pub async fn cmd_status(
} else {
println!(
"{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED",
- id = adv.id,
- h = adv.state_info.hostname,
- addr = adv.addr,
- h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad = " ".repeat(addr_len - adv.addr.to_string().len()),
+ id = id,
+ h = "",
+ addr = addr,
+ h_pad = " ".repeat(hostname_len - "".len()),
+ a_pad = " ".repeat(addr_len - addr.to_string().len()),
);
}
}
- let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|x| !x.is_up);
+ let status_keys = status.iter().map(|(id, _, _)| id).collect::<HashSet<_>>();
+ let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up);
let failure_case_2 = config
.members
.iter()
.any(|(id, _)| !status_keys.contains(id));
if failure_case_1 || failure_case_2 {
println!("\nFailed nodes:");
- for adv in status.iter().filter(|x| !x.is_up) {
- if let Some(cfg) = config.members.get(&adv.id) {
+ for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) {
+ if let Some(cfg) = config.members.get(&id) {
println!(
"{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago",
- id=adv.id,
- host=adv.state_info.hostname,
- addr=adv.addr,
+ id=id,
+ host="",
+ addr=addr,
tag=cfg.tag,
zone=cfg.zone,
capacity=cfg.capacity_string(),
- last_seen=(now_msec() - adv.last_seen) / 1000,
- h_pad=" ".repeat(hostname_len - adv.state_info.hostname.len()),
- a_pad=" ".repeat(addr_len - adv.addr.to_string().len()),
+ last_seen=(now_msec() - 0) / 1000,
+ h_pad=" ".repeat(hostname_len - "".len()),
+ a_pad=" ".repeat(addr_len - addr.to_string().len()),
t_pad=" ".repeat(tag_len - cfg.tag.len()),
z_pad=" ".repeat(zone_len - cfg.zone.len()),
);
@@ -411,12 +421,12 @@ pub async fn cmd_status(
let (tag_len, zone_len) = config
.members
.iter()
- .filter(|(&id, _)| !status.iter().any(|x| x.id == id))
+ .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id))
.map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len()))
.fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz)));
for (id, cfg) in config.members.iter() {
- if !status.iter().any(|x| x.id == *id) {
+ if !status.iter().any(|(xid, _, _)| xid == *id) {
println!(
"{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen",
id = id,
@@ -429,6 +439,7 @@ pub async fn cmd_status(
}
}
}
+ */
Ok(())
}
@@ -455,25 +466,30 @@ pub fn find_matching_node(
}
pub async fn cmd_configure(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseNodesUp(nodes) => nodes,
+ SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
- let added_node = find_matching_node(status.iter().map(|x| x.id), &args.node_id)?;
+ let added_node = find_matching_node(
+ status
+ .iter()
+ .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()),
+ &args.node_id,
+ )?;
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -527,25 +543,21 @@ pub async fn cmd_configure(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_remove(
- rpc_cli: RpcAddrClient<Message>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
- .await??
+ .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL)
+ .await?
{
- Message::AdvertiseConfig(cfg) => cfg,
+ SystemRpc::AdvertiseConfig(cfg) => cfg,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
@@ -562,21 +574,17 @@ pub async fn cmd_remove(
config.version += 1;
rpc_cli
- .call(
- &rpc_host,
- &Message::AdvertiseConfig(config),
- ADMIN_RPC_TIMEOUT,
- )
- .await??;
+ .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL)
+ .await?;
Ok(())
}
pub async fn cmd_admin(
- rpc_cli: RpcAddrClient<AdminRpc>,
- rpc_host: SocketAddr,
+ rpc_cli: &Endpoint<AdminRpc, ()>,
+ rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? {
+ match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 66828cba..7fe791b8 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -10,16 +10,16 @@ mod repair;
mod server;
use std::net::SocketAddr;
-use std::sync::Arc;
-use std::time::Duration;
use structopt::StructOpt;
-use garage_util::config::TlsConfig;
+use netapp::util::parse_peer_addr;
+use netapp::NetworkKey;
+
use garage_util::error::Error;
-use garage_rpc::membership::*;
-use garage_rpc::rpc_client::*;
+use garage_rpc::system::*;
+use garage_rpc::*;
use admin_rpc::*;
use cli::*;
@@ -27,16 +27,14 @@ use cli::*;
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
struct Opt {
- /// RPC connect to this host to execute client operations
- #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901", parse(try_from_str = parse_address))]
- pub rpc_host: SocketAddr,
+ /// Host to connect to for admin operations, in the format:
+ /// <public-key>@<ip>:<port>
+ #[structopt(short = "h", long = "rpc-host")]
+ pub rpc_host: Option<String>,
- #[structopt(long = "ca-cert")]
- pub ca_cert: Option<String>,
- #[structopt(long = "client-cert")]
- pub client_cert: Option<String>,
- #[structopt(long = "client-key")]
- pub client_key: Option<String>,
+ /// RPC secret network key for admin operations
+ #[structopt(short = "s", long = "rpc-secret")]
+ pub rpc_secret: Option<String>,
#[structopt(subcommand)]
cmd: Command,
@@ -66,33 +64,20 @@ async fn main() {
}
async fn cli_command(opt: Opt) -> Result<(), Error> {
- let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) {
- (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig {
- ca_cert,
- node_cert: client_cert,
- node_key: client_key,
- }),
- (None, None, None) => None,
- _ => {
- warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS.");
- None
- }
- };
-
- let rpc_http_cli =
- Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client"));
- let membership_rpc_cli =
- RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
- let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
-
- cli_cmd(opt.cmd, membership_rpc_cli, admin_rpc_cli, opt.rpc_host).await
-}
-
-fn parse_address(address: &str) -> Result<SocketAddr, String> {
- use std::net::ToSocketAddrs;
- address
- .to_socket_addrs()
- .map_err(|_| format!("Could not resolve {}", address))?
- .next()
- .ok_or_else(|| format!("Could not resolve {}", address))
+ let net_key_hex_str = &opt.rpc_secret.expect("No RPC secret provided");
+ let network_key = NetworkKey::from_slice(
+ &hex::decode(net_key_hex_str).expect("Invalid RPC secret key (bad hex)")[..],
+ )
+ .expect("Invalid RPC secret provided (wrong length)");
+ let (_pk, sk) = sodiumoxide::crypto::sign::ed25519::gen_keypair();
+
+ let netapp = NetApp::new(network_key, sk);
+ let (id, addr) =
+ parse_peer_addr(&opt.rpc_host.expect("No RPC host provided")).expect("Invalid RPC host");
+ netapp.clone().try_connect(addr, id).await?;
+
+ let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
+ let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
+
+ cli_cmd(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 36f7de5c..0edf3e2d 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -1,7 +1,5 @@
use std::path::PathBuf;
-use std::sync::Arc;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_util::background::*;
@@ -10,21 +8,10 @@ use garage_util::error::Error;
use garage_api::run_api_server;
use garage_model::garage::Garage;
-use garage_rpc::rpc_server::RpcServer;
use garage_web::run_web_server;
use crate::admin_rpc::*;
-async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> {
- // Wait for the CTRL+C signal
- tokio::signal::ctrl_c()
- .await
- .expect("failed to install CTRL+C signal handler");
- info!("Received CTRL+C, shutting down.");
- send_cancel.send(true)?;
- Ok(())
-}
-
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
if chan.changed().await.is_err() {
@@ -47,52 +34,46 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.open()
.expect("Unable to open sled DB");
- info!("Initialize RPC server...");
- let mut rpc_server = RpcServer::new(config.rpc_bind_addr, config.rpc_tls.clone());
-
info!("Initializing background runner...");
- let (send_cancel, watch_cancel) = watch::channel(false);
+ let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
- let bootstrap = garage.system.clone().bootstrap(
- config.bootstrap_peers,
- config.consul_host,
- config.consul_service_name,
- );
+ let garage = Garage::new(config.clone(), db, background);
+
+ let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Crate admin RPC handler...");
- AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
-
- info!("Initializing RPC and API servers...");
- let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
- let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
- let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
-
- futures::try_join!(
- bootstrap.map(|()| {
- info!("Bootstrap done");
- Ok(())
- }),
- run_rpc_server.map(|rv| {
- info!("RPC server exited");
- rv
- }),
- api_server.map(|rv| {
- info!("API server exited");
- rv
- }),
- web_server.map(|rv| {
- info!("Web server exited");
- rv
- }),
- await_background_done.map(|rv| {
- info!("Background runner exited: {:?}", rv);
- Ok(())
- }),
- shutdown_signal(send_cancel),
- )?;
+ AdminRpcHandler::new(garage.clone());
+
+ info!("Initializing API server...");
+ let api_server = tokio::spawn(run_api_server(
+ garage.clone(),
+ wait_from(watch_cancel.clone()),
+ ));
+
+ info!("Initializing web server...");
+ let web_server = tokio::spawn(run_web_server(
+ garage.clone(),
+ wait_from(watch_cancel.clone()),
+ ));
+
+ // Stuff runs
+
+ // When a cancel signal is sent, stuff stops
+ if let Err(e) = api_server.await? {
+ warn!("API server exited with error: {}", e);
+ }
+ if let Err(e) = web_server.await? {
+ warn!("Web server exited with error: {}", e);
+ }
+
+ // Remove RPC handlers for system to break reference cycles
+ garage.system.netapp.drop_all_handlers();
+
+ // Await for last parts to end
+ run_system.await?;
+ await_background_done.await?;
info!("Cleaning up...");
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 4d5d7f9d..a9ae5edf 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -17,6 +17,7 @@ garage_rpc = { version = "0.3.0", path = "../rpc" }
garage_table = { version = "0.3.0", path = "../table" }
garage_util = { version = "0.3.0", path = "../util" }
+async-trait = "0.1.7"
arc-swap = "1.0"
hex = "0.4"
log = "0.4"
@@ -31,3 +32,5 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
diff --git a/src/model/block.rs b/src/model/block.rs
index 348f0711..5574b7f6 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
+use async_trait::async_trait;
use futures::future::*;
use futures::select;
use serde::{Deserialize, Serialize};
@@ -14,9 +15,8 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
@@ -36,8 +36,9 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
-pub enum Message {
+pub enum BlockRpc {
Ok,
+ Error(String),
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
@@ -60,7 +61,9 @@ pub struct PutBlockMessage {
pub data: Vec<u8>,
}
-impl RpcMessage for Message {}
+impl Message for BlockRpc {
+ type Response = BlockRpc;
+}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
@@ -77,7 +80,7 @@ pub struct BlockManager {
resync_notify: Notify,
system: Arc<System>,
- rpc_client: Arc<RpcClient<Message>>,
+ endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
}
@@ -87,7 +90,6 @@ impl BlockManager {
data_dir: PathBuf,
replication: TableShardedReplication,
system: Arc<System>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
@@ -97,8 +99,7 @@ impl BlockManager {
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let rpc_path = "block_manager";
- let rpc_client = system.rpc_client::<Message>(rpc_path);
+ let endpoint = system.netapp.endpoint(format!("garage_model/block.rs/Rpc"));
let block_manager = Arc::new(Self {
replication,
@@ -108,35 +109,19 @@ impl BlockManager {
resync_queue,
resync_notify: Notify::new(),
system,
- rpc_client,
+ endpoint,
garage: ArcSwapOption::from(None),
});
- block_manager
- .clone()
- .register_handler(rpc_server, rpc_path.into());
- block_manager
- }
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager.endpoint.set_handler(block_manager.clone());
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager
}
- async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
+ async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> {
match msg {
- Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
- Message::GetBlock(h) => self.read_block(h).await,
- Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
+ BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ BlockRpc::GetBlock(h) => self.read_block(h).await,
+ BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
@@ -157,7 +142,7 @@ impl BlockManager {
}
/// Write a block to disk
- async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -165,18 +150,18 @@ impl BlockManager {
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
- return Ok(Message::Ok);
+ return Ok(BlockRpc::Ok);
}
let mut f = fs::File::create(path).await?;
f.write_all(data).await?;
drop(f);
- Ok(Message::Ok)
+ Ok(BlockRpc::Ok)
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -204,7 +189,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
- Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
+ Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data }))
}
/// Check if this node should have a block, but don't actually have it
@@ -346,17 +331,22 @@ impl BlockManager {
}
who.retain(|id| *id != self.system.id);
- let msg = Arc::new(Message::NeedBlockQuery(*hash));
+ let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
let who_needs_fut = who.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ self.system.rpc.call_arc(
+ &self.endpoint,
+ *to,
+ msg.clone(),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
+ )
});
let who_needs_resps = join_all(who_needs_fut).await;
let mut need_nodes = vec![];
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
match needed? {
- Message::NeedBlockReply(needed) => {
+ BlockRpc::NeedBlockReply(needed) => {
if needed {
need_nodes.push(*node);
}
@@ -377,11 +367,14 @@ impl BlockManager {
);
let put_block_message = self.read_block(hash).await?;
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&need_nodes[..],
put_block_message,
- RequestStrategy::with_quorum(need_nodes.len())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -413,18 +406,21 @@ impl BlockManager {
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::GetBlock(*hash),
- RequestStrategy::with_quorum(1)
+ BlockRpc::GetBlock(*hash),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
.with_timeout(BLOCK_RW_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
for resp in resps {
- if let Message::PutBlock(msg) = resp {
+ if let BlockRpc::PutBlock(msg) = resp {
return Ok(msg.data);
}
}
@@ -437,11 +433,14 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ BlockRpc::PutBlock(PutBlockMessage { hash, data }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -531,6 +530,16 @@ impl BlockManager {
}
}
+#[async_trait]
+impl EndpointHandler<BlockRpc> for BlockManager {
+ async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc {
+ self.clone()
+ .handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e)))
+ }
+}
+
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
diff --git a/src/model/garage.rs b/src/model/garage.rs
index c3594934..d4ea6f55 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -1,11 +1,11 @@
use std::sync::Arc;
+use netapp::NetworkKey;
+
use garage_util::background::*;
use garage_util::config::*;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::RpcHttpClient;
-use garage_rpc::rpc_server::RpcServer;
+use garage_rpc::system::System;
use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
@@ -45,26 +45,25 @@ pub struct Garage {
impl Garage {
/// Create and run garage
- pub fn new(
- config: Config,
- db: sled::Db,
- background: Arc<BackgroundRunner>,
- rpc_server: &mut RpcServer,
- ) -> Arc<Self> {
+ pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ let network_key = NetworkKey::from_slice(
+ &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
+ )
+ .expect("Invalid RPC secret key");
+
let replication_mode = ReplicationMode::parse(&config.replication_mode)
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let rpc_http_client = Arc::new(
- RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
- .expect("Could not create RPC client"),
- );
let system = System::new(
+ network_key,
config.metadata_dir.clone(),
- rpc_http_client,
background.clone(),
- rpc_server,
replication_mode.replication_factor(),
+ config.rpc_bind_addr,
+ config.bootstrap_peers.clone(),
+ config.consul_host.clone(),
+ config.consul_service_name.clone(),
);
let data_rep_param = TableShardedReplication {
@@ -87,13 +86,8 @@ impl Garage {
};
info!("Initialize block manager...");
- let block_manager = BlockManager::new(
- &db,
- config.data_dir.clone(),
- data_rep_param,
- system.clone(),
- rpc_server,
- );
+ let block_manager =
+ BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone());
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
@@ -104,7 +98,6 @@ impl Garage {
system.clone(),
&db,
"block_ref".to_string(),
- rpc_server,
);
info!("Initialize version_table...");
@@ -117,7 +110,6 @@ impl Garage {
system.clone(),
&db,
"version".to_string(),
- rpc_server,
);
info!("Initialize object_table...");
@@ -130,7 +122,6 @@ impl Garage {
system.clone(),
&db,
"object".to_string(),
- rpc_server,
);
info!("Initialize bucket_table...");
@@ -140,7 +131,6 @@ impl Garage {
system.clone(),
&db,
"bucket".to_string(),
- rpc_server,
);
info!("Initialize key_table_table...");
@@ -150,7 +140,6 @@ impl Garage {
system.clone(),
&db,
"key".to_string(),
- rpc_server,
);
info!("Initialize Garage...");
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index f1204cdf..1100c737 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -22,7 +22,10 @@ bytes = "1.0"
gethostname = "0.2"
hex = "0.4"
log = "0.4"
+rand = "0.8"
+sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+async-trait = "0.1.7"
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
@@ -32,11 +35,6 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = { version = "0.1", features = ["net"] }
-http = "0.2"
-hyper = { version = "0.14", features = ["full"] }
-hyper-rustls = { version = "0.22", default-features = false }
-rustls = "0.19"
-tokio-rustls = "0.22"
-webpki = "0.21"
-
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
+hyper = "0.14"
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 96561d0e..ea3f1139 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -4,10 +4,10 @@
extern crate log;
mod consul;
-pub(crate) mod tls_util;
-pub mod membership;
pub mod ring;
+pub mod system;
-pub mod rpc_client;
-pub mod rpc_server;
+pub mod rpc_helper;
+
+pub use rpc_helper::*;
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
deleted file mode 100644
index a77eeed3..00000000
--- a/src/rpc/membership.rs
+++ /dev/null
@@ -1,722 +0,0 @@
-//! Module containing structs related to membership management
-use std::collections::HashMap;
-use std::fmt::Write as FmtWrite;
-use std::io::{Read, Write};
-use std::net::{IpAddr, SocketAddr};
-use std::path::{Path, PathBuf};
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
-use serde::{Deserialize, Serialize};
-use tokio::sync::watch;
-use tokio::sync::Mutex;
-
-use garage_util::background::BackgroundRunner;
-use garage_util::data::*;
-use garage_util::error::Error;
-use garage_util::persister::Persister;
-use garage_util::time::*;
-
-use crate::consul::get_consul_nodes;
-use crate::ring::*;
-use crate::rpc_client::*;
-use crate::rpc_server::*;
-
-const PING_INTERVAL: Duration = Duration::from_secs(10);
-const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
-const PING_TIMEOUT: Duration = Duration::from_secs(2);
-const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
-
-/// RPC endpoint used for calls related to membership
-pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
-
-/// RPC messages related to membership
-#[derive(Debug, Serialize, Deserialize)]
-pub enum Message {
- /// Response to successfull advertisements
- Ok,
- /// Message sent to detect other nodes status
- Ping(PingMessage),
- /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
- PullStatus,
- /// Ask other node its config. Answered with AdvertiseConfig
- PullConfig,
- /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
- AdvertiseNodesUp(Vec<AdvertisedNode>),
- /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
- AdvertiseConfig(NetworkConfig),
-}
-
-impl RpcMessage for Message {}
-
-/// A ping, containing informations about status and config
-#[derive(Debug, Serialize, Deserialize)]
-pub struct PingMessage {
- id: Uuid,
- rpc_port: u16,
-
- status_hash: Hash,
- config_version: u64,
-
- state_info: StateInfo,
-}
-
-/// A node advertisement
-#[derive(Clone, Debug, Serialize, Deserialize)]
-pub struct AdvertisedNode {
- /// Id of the node this advertisement relates to
- pub id: Uuid,
- /// IP and port of the node
- pub addr: SocketAddr,
-
- /// Is the node considered up
- pub is_up: bool,
- /// When was the node last seen up, in milliseconds since UNIX epoch
- pub last_seen: u64,
-
- pub state_info: StateInfo,
-}
-
-/// This node's membership manager
-pub struct System {
- /// The id of this node
- pub id: Uuid,
-
- persist_config: Persister<NetworkConfig>,
- persist_status: Persister<Vec<AdvertisedNode>>,
- rpc_local_port: u16,
-
- state_info: StateInfo,
-
- rpc_http_client: Arc<RpcHttpClient>,
- rpc_client: Arc<RpcClient<Message>>,
-
- replication_factor: usize,
- pub(crate) status: watch::Receiver<Arc<Status>>,
- /// The ring
- pub ring: watch::Receiver<Arc<Ring>>,
-
- update_lock: Mutex<Updaters>,
-
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-}
-
-struct Updaters {
- update_status: watch::Sender<Arc<Status>>,
- update_ring: watch::Sender<Arc<Ring>>,
-}
-
-/// The status of each nodes, viewed by this node
-#[derive(Debug, Clone)]
-pub struct Status {
- /// Mapping of each node id to its known status
- pub nodes: HashMap<Uuid, Arc<StatusEntry>>,
- /// Hash of `nodes`, used to detect when nodes have different views of the cluster
- pub hash: Hash,
-}
-
-/// The status of a single node
-#[derive(Debug)]
-pub struct StatusEntry {
- /// The IP and port used to connect to this node
- pub addr: SocketAddr,
- /// Last time this node was seen
- pub last_seen: u64,
- /// Number of consecutive pings sent without reply to this node
- pub num_failures: AtomicUsize,
- pub state_info: StateInfo,
-}
-
-impl StatusEntry {
- /// is the node associated to this entry considered up
- pub fn is_up(&self) -> bool {
- self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct StateInfo {
- /// Hostname of the node
- pub hostname: String,
- /// Replication factor configured on the node
- pub replication_factor: Option<usize>, // TODO Option is just for retrocompatibility. It should become a simple usize at some point
-}
-
-impl Status {
- fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
- let addr = SocketAddr::new(ip, info.rpc_port);
- let old_status = self.nodes.insert(
- info.id,
- Arc::new(StatusEntry {
- addr,
- last_seen: now_msec(),
- num_failures: AtomicUsize::from(0),
- state_info: info.state_info.clone(),
- }),
- );
- match old_status {
- None => {
- info!("Newly pingable node: {}", hex::encode(&info.id));
- true
- }
- Some(x) => x.addr != addr,
- }
- }
-
- fn recalculate_hash(&mut self) {
- let mut nodes = self.nodes.iter().collect::<Vec<_>>();
- nodes.sort_unstable_by_key(|(id, _status)| *id);
-
- let mut nodes_txt = String::new();
- debug!("Current set of pingable nodes: --");
- for (id, status) in nodes {
- debug!("{} {}", hex::encode(&id), status.addr);
- writeln!(&mut nodes_txt, "{} {}", hex::encode(&id), status.addr).unwrap();
- }
- debug!("END --");
- self.hash = blake2sum(nodes_txt.as_bytes());
- }
-
- fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
- let mut mem = vec![];
- for (node, status) in self.nodes.iter() {
- let state_info = if *node == system.id {
- system.state_info.clone()
- } else {
- status.state_info.clone()
- };
- mem.push(AdvertisedNode {
- id: *node,
- addr: status.addr,
- is_up: status.is_up(),
- last_seen: status.last_seen,
- state_info,
- });
- }
- mem
- }
-}
-
-fn gen_node_id(metadata_dir: &Path) -> Result<Uuid, Error> {
- let mut id_file = metadata_dir.to_path_buf();
- id_file.push("node_id");
- if id_file.as_path().exists() {
- let mut f = std::fs::File::open(id_file.as_path())?;
- let mut d = vec![];
- f.read_to_end(&mut d)?;
- if d.len() != 32 {
- return Err(Error::Message("Corrupt node_id file".to_string()));
- }
-
- let mut id = [0u8; 32];
- id.copy_from_slice(&d[..]);
- Ok(id.into())
- } else {
- let id = gen_uuid();
-
- let mut f = std::fs::File::create(id_file.as_path())?;
- f.write_all(id.as_slice())?;
- Ok(id)
- }
-}
-
-impl System {
- /// Create this node's membership manager
- pub fn new(
- metadata_dir: PathBuf,
- rpc_http_client: Arc<RpcHttpClient>,
- background: Arc<BackgroundRunner>,
- rpc_server: &mut RpcServer,
- replication_factor: usize,
- ) -> Arc<Self> {
- let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
- info!("Node ID: {}", hex::encode(&id));
-
- let persist_config = Persister::new(&metadata_dir, "network_config");
- let persist_status = Persister::new(&metadata_dir, "peer_info");
-
- let net_config = match persist_config.load() {
- Ok(x) => x,
- Err(e) => {
- match Persister::<garage_rpc_021::ring::NetworkConfig>::new(
- &metadata_dir,
- "network_config",
- )
- .load()
- {
- Ok(old_config) => NetworkConfig::migrate_from_021(old_config),
- Err(e2) => {
- info!(
- "No valid previous network configuration stored ({}, {}), starting fresh.",
- e, e2
- );
- NetworkConfig::new()
- }
- }
- }
- };
-
- let mut status = Status {
- nodes: HashMap::new(),
- hash: Hash::default(),
- };
- status.recalculate_hash();
- let (update_status, status) = watch::channel(Arc::new(status));
-
- let state_info = StateInfo {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor: Some(replication_factor),
- };
-
- let ring = Ring::new(net_config, replication_factor);
- let (update_ring, ring) = watch::channel(Arc::new(ring));
-
- let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
- let rpc_client = RpcClient::new(
- RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.clone()),
- background.clone(),
- status.clone(),
- );
-
- let sys = Arc::new(System {
- id,
- persist_config,
- persist_status,
- rpc_local_port: rpc_server.bind_addr.port(),
- state_info,
- rpc_http_client,
- rpc_client,
- replication_factor,
- status,
- ring,
- update_lock: Mutex::new(Updaters {
- update_status,
- update_ring,
- }),
- background,
- });
- sys.clone().register_handler(rpc_server, rpc_path);
- sys
- }
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- rpc_server.add_handler::<Message, _, _>(path, move |msg, addr| {
- let self2 = self.clone();
- async move {
- match msg {
- Message::Ping(ping) => self2.handle_ping(&addr, &ping).await,
-
- Message::PullStatus => Ok(self2.handle_pull_status()),
- Message::PullConfig => Ok(self2.handle_pull_config()),
- Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
- Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
-
- _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- }
- }
- });
- }
-
- /// Get an RPC client
- pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
- RpcClient::new(
- RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
- self.background.clone(),
- self.status.clone(),
- )
- }
-
- /// Save network configuration to disc
- async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
- let ring = self.ring.borrow().clone();
- self.persist_config
- .save_async(&ring.config)
- .await
- .expect("Cannot save current cluster configuration");
- Ok(())
- }
-
- fn make_ping(&self) -> Message {
- let status = self.status.borrow().clone();
- let ring = self.ring.borrow().clone();
- Message::Ping(PingMessage {
- id: self.id,
- rpc_port: self.rpc_local_port,
- status_hash: status.hash,
- config_version: ring.config.version,
- state_info: self.state_info.clone(),
- })
- }
-
- async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
- let status = self.status.borrow().clone();
- let to = status
- .nodes
- .keys()
- .filter(|x| **x != self.id)
- .cloned()
- .collect::<Vec<_>>();
- self.rpc_client.call_many(&to[..], msg, timeout).await;
- }
-
- /// Perform bootstraping, starting the ping loop
- pub async fn bootstrap(
- self: Arc<Self>,
- peers: Vec<SocketAddr>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
- ) {
- let self2 = self.clone();
- self.background
- .spawn_worker("discovery loop".to_string(), |stop_signal| {
- self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal)
- });
-
- let self2 = self.clone();
- self.background
- .spawn_worker("ping loop".to_string(), |stop_signal| {
- self2.ping_loop(stop_signal)
- });
- }
-
- async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<Uuid>)>) {
- let ping_msg = self.make_ping();
- let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
- let sys = self.clone();
- let ping_msg_ref = &ping_msg;
- async move {
- (
- id_option,
- addr,
- sys.rpc_client
- .by_addr()
- .call(&addr, ping_msg_ref, PING_TIMEOUT)
- .await,
- )
- }
- }))
- .await;
-
- let update_locked = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
- let ring = self.ring.borrow().clone();
-
- let mut has_changes = false;
- let mut to_advertise = vec![];
-
- for (id_option, addr, ping_resp) in ping_resps {
- if let Ok(Ok(Message::Ping(info))) = ping_resp {
- let is_new = status.handle_ping(addr.ip(), &info);
- if is_new {
- has_changes = true;
- to_advertise.push(AdvertisedNode {
- id: info.id,
- addr: *addr,
- is_up: true,
- last_seen: now_msec(),
- state_info: info.state_info.clone(),
- });
- }
- if is_new || status.hash != info.status_hash {
- self.background
- .spawn_cancellable(self.clone().pull_status(info.id).map(Ok));
- }
- if is_new || ring.config.version < info.config_version {
- self.background
- .spawn_cancellable(self.clone().pull_config(info.id).map(Ok));
- }
- } else if let Some(id) = id_option {
- if let Some(st) = status.nodes.get_mut(id) {
- // we need to increment failure counter as call was done using by_addr so the
- // counter was not auto-incremented
- st.num_failures.fetch_add(1, Ordering::SeqCst);
- if !st.is_up() {
- warn!("Node {:?} seems to be down.", id);
- if !ring.config.members.contains_key(id) {
- info!("Removing node {:?} from status (not in config and not responding to pings anymore)", id);
- status.nodes.remove(&id);
- has_changes = true;
- }
- }
- }
- }
- }
- if has_changes {
- status.recalculate_hash();
- }
- self.update_status(&update_locked, status).await;
- drop(update_locked);
-
- if !to_advertise.is_empty() {
- self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT)
- .await;
- }
- }
-
- async fn handle_ping(
- self: Arc<Self>,
- from: &SocketAddr,
- ping: &PingMessage,
- ) -> Result<Message, Error> {
- let update_locked = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
-
- let is_new = status.handle_ping(from.ip(), ping);
- if is_new {
- status.recalculate_hash();
- }
- let status_hash = status.hash;
- let config_version = self.ring.borrow().config.version;
-
- self.update_status(&update_locked, status).await;
- drop(update_locked);
-
- if is_new || status_hash != ping.status_hash {
- self.background
- .spawn_cancellable(self.clone().pull_status(ping.id).map(Ok));
- }
- if is_new || config_version < ping.config_version {
- self.background
- .spawn_cancellable(self.clone().pull_config(ping.id).map(Ok));
- }
-
- Ok(self.make_ping())
- }
-
- fn handle_pull_status(&self) -> Message {
- Message::AdvertiseNodesUp(self.status.borrow().to_serializable_membership(self))
- }
-
- fn handle_pull_config(&self) -> Message {
- let ring = self.ring.borrow().clone();
- Message::AdvertiseConfig(ring.config.clone())
- }
-
- async fn handle_advertise_nodes_up(
- self: Arc<Self>,
- adv: &[AdvertisedNode],
- ) -> Result<Message, Error> {
- let mut to_ping = vec![];
-
- let update_lock = self.update_lock.lock().await;
- let mut status: Status = self.status.borrow().as_ref().clone();
- let mut has_changed = false;
- let mut max_replication_factor = 0;
-
- for node in adv.iter() {
- if node.id == self.id {
- // learn our own ip address
- let self_addr = SocketAddr::new(node.addr.ip(), self.rpc_local_port);
- let old_self = status.nodes.insert(
- node.id,
- Arc::new(StatusEntry {
- addr: self_addr,
- last_seen: now_msec(),
- num_failures: AtomicUsize::from(0),
- state_info: self.state_info.clone(),
- }),
- );
- has_changed = match old_self {
- None => true,
- Some(x) => x.addr != self_addr,
- };
- } else {
- let ping_them = match status.nodes.get(&node.id) {
- // Case 1: new node
- None => true,
- // Case 2: the node might have changed address
- Some(our_node) => node.is_up && !our_node.is_up() && our_node.addr != node.addr,
- };
- max_replication_factor = std::cmp::max(
- max_replication_factor,
- node.state_info.replication_factor.unwrap_or_default(),
- );
- if ping_them {
- to_ping.push((node.addr, Some(node.id)));
- }
- }
- }
-
- if self.replication_factor < max_replication_factor {
- error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs",
- max_replication_factor,
- self.replication_factor);
- std::process::exit(1);
- }
- if has_changed {
- status.recalculate_hash();
- }
- self.update_status(&update_lock, status).await;
- drop(update_lock);
-
- if !to_ping.is_empty() {
- self.background
- .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok));
- }
-
- Ok(Message::Ok)
- }
-
- async fn handle_advertise_config(
- self: Arc<Self>,
- adv: &NetworkConfig,
- ) -> Result<Message, Error> {
- let update_lock = self.update_lock.lock().await;
- let ring: Arc<Ring> = self.ring.borrow().clone();
-
- if adv.version > ring.config.version {
- let ring = Ring::new(adv.clone(), self.replication_factor);
- update_lock.update_ring.send(Arc::new(ring))?;
- drop(update_lock);
-
- self.background.spawn_cancellable(
- self.clone()
- .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
- .map(Ok),
- );
- self.background.spawn(self.clone().save_network_config());
- }
-
- Ok(Message::Ok)
- }
-
- async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
- while !*stop_signal.borrow() {
- let restart_at = tokio::time::sleep(PING_INTERVAL);
-
- let status = self.status.borrow().clone();
- let ping_addrs = status
- .nodes
- .iter()
- .filter(|(id, _)| **id != self.id)
- .map(|(id, status)| (status.addr, Some(*id)))
- .collect::<Vec<_>>();
-
- self.clone().ping_nodes(ping_addrs).await;
-
- select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
- }
- }
- }
-
- async fn discovery_loop(
- self: Arc<Self>,
- bootstrap_peers: Vec<SocketAddr>,
- consul_host: Option<String>,
- consul_service_name: Option<String>,
- mut stop_signal: watch::Receiver<bool>,
- ) {
- let consul_config = match (consul_host, consul_service_name) {
- (Some(ch), Some(csn)) => Some((ch, csn)),
- _ => None,
- };
-
- while !*stop_signal.borrow() {
- let not_configured = self.ring.borrow().config.members.is_empty();
- let no_peers = self.status.borrow().nodes.len() < 3;
- let bad_peers = self
- .status
- .borrow()
- .nodes
- .iter()
- .filter(|(_, v)| v.is_up())
- .count() != self.ring.borrow().config.members.len();
-
- if not_configured || no_peers || bad_peers {
- info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
-
- let mut ping_list = bootstrap_peers
- .iter()
- .map(|ip| (*ip, None))
- .collect::<Vec<_>>();
-
- if let Ok(peers) = self.persist_status.load_async().await {
- ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
- }
-
- if let Some((consul_host, consul_service_name)) = &consul_config {
- match get_consul_nodes(consul_host, consul_service_name).await {
- Ok(node_list) => {
- ping_list.extend(node_list.iter().map(|a| (*a, None)));
- }
- Err(e) => {
- warn!("Could not retrieve node list from Consul: {}", e);
- }
- }
- }
-
- self.clone().ping_nodes(ping_list).await;
- }
-
- let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
- select! {
- _ = restart_at.fuse() => {},
- _ = stop_signal.changed().fuse() => {},
- }
- }
- }
-
- // for some reason fixing this is causing compilation error, see https://github.com/rust-lang/rust-clippy/issues/7052
- #[allow(clippy::manual_async_fn)]
- fn pull_status(
- self: Arc<Self>,
- peer: Uuid,
- ) -> impl futures::future::Future<Output = ()> + Send + 'static {
- async move {
- let resp = self
- .rpc_client
- .call(peer, Message::PullStatus, PING_TIMEOUT)
- .await;
- if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
- let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
- }
- }
- }
-
- async fn pull_config(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc_client
- .call(peer, Message::PullConfig, PING_TIMEOUT)
- .await;
- if let Ok(Message::AdvertiseConfig(config)) = resp {
- let _: Result<_, _> = self.handle_advertise_config(&config).await;
- }
- }
-
- async fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
- if status.hash != self.status.borrow().hash {
- let mut list = status.to_serializable_membership(&self);
-
- // Combine with old peer list to make sure no peer is lost
- if let Ok(old_list) = self.persist_status.load_async().await {
- for pp in old_list {
- if !list.iter().any(|np| pp.id == np.id) {
- list.push(pp);
- }
- }
- }
-
- if !list.is_empty() {
- info!("Persisting new peer list ({} peers)", list.len());
- self.persist_status
- .save_async(&list)
- .await
- .expect("Unable to persist peer list");
- }
- }
-
- updaters
- .update_status
- .send(Arc::new(status))
- .expect("Could not update internal membership status");
- }
-}
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 90db8fd2..7cbab762 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -3,6 +3,8 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
+use netapp::NodeID;
+
use serde::{Deserialize, Serialize};
use garage_util::data::*;
@@ -98,7 +100,7 @@ pub struct Ring {
pub config: NetworkConfig,
// Internal order of nodes used to make a more compact representation of the ring
- nodes: Vec<Uuid>,
+ nodes: Vec<NodeID>,
// The list of entries in the ring
ring: Vec<RingEntry>,
@@ -260,6 +262,11 @@ impl Ring {
})
.collect::<Vec<_>>();
+ let nodes = nodes
+ .iter()
+ .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
+ .collect::<Vec<_>>();
+
Self {
replication_factor,
config,
@@ -291,7 +298,7 @@ impl Ring {
}
/// Walk the ring to find the n servers in which data should be replicated
- pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> {
+ pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<NodeID> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
deleted file mode 100644
index 806c7e69..00000000
--- a/src/rpc/rpc_client.rs
+++ /dev/null
@@ -1,369 +0,0 @@
-//! Contain structs related to making RPCs
-use std::borrow::Borrow;
-use std::marker::PhantomData;
-use std::net::SocketAddr;
-use std::pin::Pin;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
-use std::time::Duration;
-
-use arc_swap::ArcSwapOption;
-use futures::future::Future;
-use futures::stream::futures_unordered::FuturesUnordered;
-use futures::stream::StreamExt;
-use futures_util::future::FutureExt;
-use hyper::client::{Client, HttpConnector};
-use hyper::{Body, Method, Request};
-use tokio::sync::{watch, Semaphore};
-
-use garage_util::background::BackgroundRunner;
-use garage_util::config::TlsConfig;
-use garage_util::data::*;
-use garage_util::error::{Error, RpcError};
-
-use crate::membership::Status;
-use crate::rpc_server::RpcMessage;
-use crate::tls_util;
-
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
-
-/// Strategy to apply when making RPC
-#[derive(Copy, Clone)]
-pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
- /// Min number of response to consider the request successful
- pub rs_quorum: usize,
- /// Should requests be dropped after enough response are received
- pub rs_interrupt_after_quorum: bool,
-}
-
-impl RequestStrategy {
- /// Create a RequestStrategy with default timeout and not interrupting when quorum reached
- pub fn with_quorum(quorum: usize) -> Self {
- RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
- rs_quorum: quorum,
- rs_interrupt_after_quorum: false,
- }
- }
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
- /// Set if requests can be dropped after quorum has been reached
- /// In general true for read requests, and false for write
- pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
- self.rs_interrupt_after_quorum = interrupt;
- self
- }
-}
-
-/// Shortcut for a boxed async function taking a message, and resolving to another message or an
-/// error
-pub type LocalHandlerFn<M> =
- Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
-
-/// Client used to send RPC
-pub struct RpcClient<M: RpcMessage> {
- status: watch::Receiver<Arc<Status>>,
- background: Arc<BackgroundRunner>,
-
- local_handler: ArcSwapOption<(Uuid, LocalHandlerFn<M>)>,
-
- rpc_addr_client: RpcAddrClient<M>,
-}
-
-impl<M: RpcMessage + 'static> RpcClient<M> {
- /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
- pub fn new(
- rac: RpcAddrClient<M>,
- background: Arc<BackgroundRunner>,
- status: watch::Receiver<Arc<Status>>,
- ) -> Arc<Self> {
- Arc::new(Self {
- rpc_addr_client: rac,
- background,
- status,
- local_handler: ArcSwapOption::new(None),
- })
- }
-
- /// Set the local handler, to process RPC to this node without network usage
- pub fn set_local_handler<F, Fut>(&self, my_id: Uuid, handler: F)
- where
- F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
- Fut: Future<Output = Result<M, Error>> + Send + 'static,
- {
- let handler_arc = Arc::new(handler);
- let handler: LocalHandlerFn<M> = Box::new(move |msg| {
- let handler_arc2 = handler_arc.clone();
- Box::pin(async move { handler_arc2(msg).await })
- });
- self.local_handler.swap(Some(Arc::new((my_id, handler))));
- }
-
- /// Get a RPC client to make calls using node's SocketAddr instead of its ID
- pub fn by_addr(&self) -> &RpcAddrClient<M> {
- &self.rpc_addr_client
- }
-
- /// Make a RPC call
- pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result<M, Error> {
- self.call_arc(to, Arc::new(msg), timeout).await
- }
-
- /// Make a RPC call from a message stored in an Arc
- pub async fn call_arc(&self, to: Uuid, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
- if let Some(lh) = self.local_handler.load_full() {
- let (my_id, local_handler) = lh.as_ref();
- if to.borrow() == my_id {
- return local_handler(msg).await;
- }
- }
- let status = self.status.borrow().clone();
- let node_status = match status.nodes.get(&to) {
- Some(node_status) => {
- if node_status.is_up() {
- node_status
- } else {
- return Err(Error::from(RpcError::NodeDown(to)));
- }
- }
- None => {
- return Err(Error::Message(format!(
- "Peer ID not found: {:?}",
- to.borrow()
- )))
- }
- };
- match self
- .rpc_addr_client
- .call(&node_status.addr, msg, timeout)
- .await
- {
- Err(rpc_error) => {
- node_status.num_failures.fetch_add(1, Ordering::SeqCst);
- Err(Error::from(rpc_error))
- }
- Ok(x) => x,
- }
- }
-
- /// Make a RPC call to multiple servers, returning a Vec containing each result
- pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
- let msg = Arc::new(msg);
- let mut resp_stream = to
- .iter()
- .map(|to| self.call_arc(*to, msg.clone(), timeout))
- .collect::<FuturesUnordered<_>>();
-
- let mut results = vec![];
- while let Some(resp) = resp_stream.next().await {
- results.push(resp);
- }
- results
- }
-
- /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
- /// strategy could not be respected due to too many errors
- pub async fn try_call_many(
- self: &Arc<Self>,
- to: &[Uuid],
- msg: M,
- strategy: RequestStrategy,
- ) -> Result<Vec<M>, Error> {
- let timeout = strategy.rs_timeout;
-
- let msg = Arc::new(msg);
- let mut resp_stream = to
- .to_vec()
- .into_iter()
- .map(|to| {
- let self2 = self.clone();
- let msg = msg.clone();
- async move { self2.call_arc(to, msg, timeout).await }
- })
- .collect::<FuturesUnordered<_>>();
-
- let mut results = vec![];
- let mut errors = vec![];
-
- while let Some(resp) = resp_stream.next().await {
- match resp {
- Ok(msg) => {
- results.push(msg);
- if results.len() >= strategy.rs_quorum {
- break;
- }
- }
- Err(e) => {
- errors.push(e);
- }
- }
- }
-
- if results.len() >= strategy.rs_quorum {
- // Continue requests in background.
- // Continue the remaining requests immediately using tokio::spawn
- // but enqueue a task in the background runner
- // to ensure that the process won't exit until the requests are done
- // (if we had just enqueued the resp_stream.collect directly in the background runner,
- // the requests might have been put on hold in the background runner's queue,
- // in which case they might timeout or otherwise fail)
- if !strategy.rs_interrupt_after_quorum {
- let wait_finished_fut = tokio::spawn(async move {
- resp_stream.collect::<Vec<_>>().await;
- });
- self.background.spawn(wait_finished_fut.map(|_| Ok(())));
- }
-
- Ok(results)
- } else {
- let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RpcError::TooManyErrors(errors)))
- }
- }
-}
-
-/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
-pub struct RpcAddrClient<M: RpcMessage> {
- phantom: PhantomData<M>,
-
- http_client: Arc<RpcHttpClient>,
- path: String,
-}
-
-impl<M: RpcMessage> RpcAddrClient<M> {
- /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
- pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
- Self {
- phantom: PhantomData::default(),
- http_client,
- path,
- }
- }
-
- /// Make a RPC
- pub async fn call<MB>(
- &self,
- to_addr: &SocketAddr,
- msg: MB,
- timeout: Duration,
- ) -> Result<Result<M, Error>, RpcError>
- where
- MB: Borrow<M>,
- {
- self.http_client
- .call(&self.path, to_addr, msg, timeout)
- .await
- }
-}
-
-/// HTTP client used to make RPCs
-pub struct RpcHttpClient {
- request_limiter: Semaphore,
- method: ClientMethod,
-}
-
-enum ClientMethod {
- Http(Client<HttpConnector, hyper::Body>),
- Https(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
-}
-
-impl RpcHttpClient {
- /// Create a new RpcHttpClient
- pub fn new(
- max_concurrent_requests: usize,
- tls_config: &Option<TlsConfig>,
- ) -> Result<Self, Error> {
- let method = if let Some(cf) = tls_config {
- let ca_certs = tls_util::load_certs(&cf.ca_cert).map_err(|e| {
- Error::Message(format!("Failed to open CA certificate file: {:?}", e))
- })?;
- let node_certs = tls_util::load_certs(&cf.node_cert)
- .map_err(|e| Error::Message(format!("Failed to open certificate file: {:?}", e)))?;
- let node_key = tls_util::load_private_key(&cf.node_key)
- .map_err(|e| Error::Message(format!("Failed to open private key file: {:?}", e)))?;
-
- let mut config = rustls::ClientConfig::new();
-
- for crt in ca_certs.iter() {
- config.root_store.add(crt)?;
- }
-
- config.set_single_client_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
-
- let connector =
- tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
-
- ClientMethod::Https(Client::builder().build(connector))
- } else {
- ClientMethod::Http(Client::new())
- };
- Ok(RpcHttpClient {
- method,
- request_limiter: Semaphore::new(max_concurrent_requests),
- })
- }
-
- /// Make a RPC
- async fn call<M, MB>(
- &self,
- path: &str,
- to_addr: &SocketAddr,
- msg: MB,
- timeout: Duration,
- ) -> Result<Result<M, Error>, RpcError>
- where
- MB: Borrow<M>,
- M: RpcMessage,
- {
- let uri = match self.method {
- ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path),
- ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path),
- };
-
- let req = Request::builder()
- .method(Method::POST)
- .uri(uri)
- .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
-
- let resp_fut = match &self.method {
- ClientMethod::Http(client) => client.request(req).fuse(),
- ClientMethod::Https(client) => client.request(req).fuse(),
- };
-
- trace!("({}) Acquiring request_limiter slot...", path);
- let slot = self.request_limiter.acquire().await;
- trace!("({}) Got slot, doing request to {}...", path, to_addr);
- let resp = tokio::time::timeout(timeout, resp_fut)
- .await
- .map_err(|e| {
- debug!(
- "RPC timeout to {}: {}",
- to_addr,
- debug_serialize(msg.borrow())
- );
- e
- })?
- .map_err(|e| {
- warn!(
- "RPC HTTP client error when connecting to {}: {}",
- to_addr, e
- );
- e
- })?;
-
- let status = resp.status();
- trace!("({}) Request returned, got status {}", path, status);
- let body = hyper::body::to_bytes(resp.into_body()).await?;
- drop(slot);
-
- match rmp_serde::decode::from_read::<_, Result<M, String>>(&body[..])? {
- Err(e) => Ok(Err(Error::RemoteError(e, status))),
- Ok(x) => Ok(Ok(x)),
- }
- }
-}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
new file mode 100644
index 00000000..c9458ee6
--- /dev/null
+++ b/src/rpc/rpc_helper.rs
@@ -0,0 +1,206 @@
+//! Contain structs related to making RPCs
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::future::join_all;
+use futures::stream::futures_unordered::FuturesUnordered;
+use futures::stream::StreamExt;
+use futures_util::future::FutureExt;
+use tokio::select;
+
+pub use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+use netapp::peering::fullmesh::FullMeshPeeringStrategy;
+pub use netapp::proto::*;
+pub use netapp::{NetApp, NodeID};
+
+use garage_util::background::BackgroundRunner;
+use garage_util::error::{Error, RpcError};
+
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+
+/// Strategy to apply when making RPC
+#[derive(Copy, Clone)]
+pub struct RequestStrategy {
+ /// Max time to wait for reponse
+ pub rs_timeout: Duration,
+ /// Min number of response to consider the request successful
+ pub rs_quorum: Option<usize>,
+ /// Should requests be dropped after enough response are received
+ pub rs_interrupt_after_quorum: bool,
+ /// Request priority
+ pub rs_priority: RequestPriority,
+}
+
+impl RequestStrategy {
+ /// Create a RequestStrategy with default timeout and not interrupting when quorum reached
+ pub fn with_priority(prio: RequestPriority) -> Self {
+ RequestStrategy {
+ rs_timeout: DEFAULT_TIMEOUT,
+ rs_quorum: None,
+ rs_interrupt_after_quorum: false,
+ rs_priority: prio,
+ }
+ }
+ /// Set quorum to be reached for request
+ pub fn with_quorum(mut self, quorum: usize) -> Self {
+ self.rs_quorum = Some(quorum);
+ self
+ }
+ /// Set timeout of the strategy
+ pub fn with_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = timeout;
+ self
+ }
+ /// Set if requests can be dropped after quorum has been reached
+ /// In general true for read requests, and false for write
+ pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
+ self.rs_interrupt_after_quorum = interrupt;
+ self
+ }
+}
+
+#[derive(Clone)]
+pub struct RpcHelper {
+ pub(crate) fullmesh: Arc<FullMeshPeeringStrategy>,
+ pub(crate) background: Arc<BackgroundRunner>,
+}
+
+impl RpcHelper {
+ pub async fn call<M, H>(
+ &self,
+ endpoint: &Endpoint<M, H>,
+ to: NodeID,
+ msg: M,
+ strat: RequestStrategy,
+ ) -> Result<M::Response, Error>
+ where
+ M: Message,
+ H: EndpointHandler<M>,
+ {
+ self.call_arc(endpoint, to, Arc::new(msg), strat).await
+ }
+
+ pub async fn call_arc<M, H>(
+ &self,
+ endpoint: &Endpoint<M, H>,
+ to: NodeID,
+ msg: Arc<M>,
+ strat: RequestStrategy,
+ ) -> Result<M::Response, Error>
+ where
+ M: Message,
+ H: EndpointHandler<M>,
+ {
+ select! {
+ res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?),
+ _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)),
+ }
+ }
+
+ pub async fn call_many<M, H>(
+ &self,
+ endpoint: &Endpoint<M, H>,
+ to: &[NodeID],
+ msg: M,
+ strat: RequestStrategy,
+ ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ where
+ M: Message,
+ H: EndpointHandler<M>,
+ {
+ let msg = Arc::new(msg);
+ let resps = join_all(
+ to.iter()
+ .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)),
+ )
+ .await;
+ to.iter()
+ .cloned()
+ .zip(resps.into_iter())
+ .collect::<Vec<_>>()
+ }
+
+ pub async fn broadcast<M, H>(
+ &self,
+ endpoint: &Endpoint<M, H>,
+ msg: M,
+ strat: RequestStrategy,
+ ) -> Vec<(NodeID, Result<M::Response, Error>)>
+ where
+ M: Message,
+ H: EndpointHandler<M>,
+ {
+ let to = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|p| p.id)
+ .collect::<Vec<_>>();
+ self.call_many(endpoint, &to[..], msg, strat).await
+ }
+
+ /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
+ /// strategy could not be respected due to too many errors
+ pub async fn try_call_many<M, H>(
+ &self,
+ endpoint: &Arc<Endpoint<M, H>>,
+ to: &[NodeID],
+ msg: M,
+ strategy: RequestStrategy,
+ ) -> Result<Vec<M::Response>, Error>
+ where
+ M: Message + 'static,
+ H: EndpointHandler<M> + 'static,
+ {
+ let msg = Arc::new(msg);
+ let mut resp_stream = to
+ .to_vec()
+ .into_iter()
+ .map(|to| {
+ let self2 = self.clone();
+ let msg = msg.clone();
+ let endpoint2 = endpoint.clone();
+ async move { self2.call_arc(&endpoint2, to, msg, strategy).await }
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut results = vec![];
+ let mut errors = vec![];
+ let quorum = strategy.rs_quorum.unwrap_or(to.len());
+
+ while let Some(resp) = resp_stream.next().await {
+ match resp {
+ Ok(msg) => {
+ results.push(msg);
+ if results.len() >= quorum {
+ break;
+ }
+ }
+ Err(e) => {
+ errors.push(e);
+ }
+ }
+ }
+
+ if results.len() >= quorum {
+ // Continue requests in background.
+ // Continue the remaining requests immediately using tokio::spawn
+ // but enqueue a task in the background runner
+ // to ensure that the process won't exit until the requests are done
+ // (if we had just enqueued the resp_stream.collect directly in the background runner,
+ // the requests might have been put on hold in the background runner's queue,
+ // in which case they might timeout or otherwise fail)
+ if !strategy.rs_interrupt_after_quorum {
+ let wait_finished_fut = tokio::spawn(async move {
+ resp_stream.collect::<Vec<_>>().await;
+ });
+ self.background.spawn(wait_finished_fut.map(|_| Ok(())));
+ }
+
+ Ok(results)
+ } else {
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ Err(Error::from(RpcError::TooManyErrors(errors)))
+ }
+ }
+}
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
deleted file mode 100644
index 81361ab9..00000000
--- a/src/rpc/rpc_server.rs
+++ /dev/null
@@ -1,247 +0,0 @@
-//! Contains structs related to receiving RPCs
-use std::collections::HashMap;
-use std::net::SocketAddr;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Instant;
-
-use futures::future::Future;
-use futures_util::future::*;
-use futures_util::stream::*;
-use hyper::server::conn::AddrStream;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use serde::{Deserialize, Serialize};
-use tokio::net::{TcpListener, TcpStream};
-use tokio_rustls::server::TlsStream;
-use tokio_rustls::TlsAcceptor;
-use tokio_stream::wrappers::TcpListenerStream;
-
-use garage_util::config::TlsConfig;
-use garage_util::data::*;
-use garage_util::error::Error;
-
-use crate::tls_util;
-
-/// Trait for messages that can be sent as RPC
-pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
-
-type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
-type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
-
-/// Structure handling RPCs
-pub struct RpcServer {
- /// The address the RpcServer will bind
- pub bind_addr: SocketAddr,
- /// The tls configuration used for RPC
- pub tls_config: Option<TlsConfig>,
-
- handlers: HashMap<String, Handler>,
-}
-
-async fn handle_func<M, F, Fut>(
- handler: Arc<F>,
- req: Request<Body>,
- sockaddr: SocketAddr,
- name: Arc<String>,
-) -> Result<Response<Body>, Error>
-where
- M: RpcMessage + 'static,
- F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
- Fut: Future<Output = Result<M, Error>> + Send + 'static,
-{
- let begin_time = Instant::now();
- let whole_body = hyper::body::to_bytes(req.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, M>(&whole_body[..])?;
-
- trace!(
- "Request message: {}",
- serde_json::to_string(&msg)
- .unwrap_or_else(|_| "<json error>".into())
- .chars()
- .take(100)
- .collect::<String>()
- );
-
- match handler(msg, sockaddr).await {
- Ok(resp) => {
- let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
- let rpc_duration = (Instant::now() - begin_time).as_millis();
- if rpc_duration > 100 {
- debug!("RPC {} ok, took long: {} ms", name, rpc_duration,);
- }
- Ok(Response::new(Body::from(resp_bytes)))
- }
- Err(e) => {
- let err_str = format!("{}", e);
- let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
- let mut err_response = Response::new(Body::from(rep_bytes));
- *err_response.status_mut() = match e {
- Error::BadRpc(_) => StatusCode::BAD_REQUEST,
- _ => StatusCode::INTERNAL_SERVER_ERROR,
- };
- warn!(
- "RPC error ({}): {} ({} ms)",
- name,
- e,
- (Instant::now() - begin_time).as_millis(),
- );
- Ok(err_response)
- }
- }
-}
-
-impl RpcServer {
- /// Create a new RpcServer
- pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
- Self {
- bind_addr,
- tls_config,
- handlers: HashMap::new(),
- }
- }
-
- /// Add handler handling request made to `name`
- pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
- where
- M: RpcMessage + 'static,
- F: Fn(M, SocketAddr) -> Fut + Send + Sync + 'static,
- Fut: Future<Output = Result<M, Error>> + Send + 'static,
- {
- let name2 = Arc::new(name.clone());
- let handler_arc = Arc::new(handler);
- let handler = Box::new(move |req: Request<Body>, sockaddr: SocketAddr| {
- let handler2 = handler_arc.clone();
- let b: ResponseFuture = Box::pin(handle_func(handler2, req, sockaddr, name2.clone()));
- b
- });
- self.handlers.insert(name, handler);
- }
-
- async fn handler(
- self: Arc<Self>,
- req: Request<Body>,
- addr: SocketAddr,
- ) -> Result<Response<Body>, Error> {
- if req.method() != Method::POST {
- let mut bad_request = Response::default();
- *bad_request.status_mut() = StatusCode::BAD_REQUEST;
- return Ok(bad_request);
- }
-
- let path = &req.uri().path()[1..].to_string();
-
- let handler = match self.handlers.get(path) {
- Some(h) => h,
- None => {
- let mut not_found = Response::default();
- *not_found.status_mut() = StatusCode::NOT_FOUND;
- return Ok(not_found);
- }
- };
-
- trace!("({}) Handling request", path);
-
- let resp_waiter = tokio::spawn(handler(req, addr));
- match resp_waiter.await {
- Err(err) => {
- warn!("Handler await error: {}", err);
- let mut ise = Response::default();
- *ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
- Ok(ise)
- }
- Ok(Err(err)) => {
- trace!("({}) Request handler failed: {}", path, err);
- let mut bad_request = Response::new(Body::from(format!("{}", err)));
- *bad_request.status_mut() = StatusCode::BAD_REQUEST;
- Ok(bad_request)
- }
- Ok(Ok(resp)) => {
- trace!("({}) Request handler succeeded", path);
- Ok(resp)
- }
- }
- }
-
- /// Run the RpcServer
- pub async fn run(
- self: Arc<Self>,
- shutdown_signal: impl Future<Output = ()>,
- ) -> Result<(), Error> {
- if let Some(tls_config) = self.tls_config.as_ref() {
- let ca_certs = tls_util::load_certs(&tls_config.ca_cert)?;
- let node_certs = tls_util::load_certs(&tls_config.node_cert)?;
- let node_key = tls_util::load_private_key(&tls_config.node_key)?;
-
- let mut ca_store = rustls::RootCertStore::empty();
- for crt in ca_certs.iter() {
- ca_store.add(crt)?;
- }
-
- let mut config =
- rustls::ServerConfig::new(rustls::AllowAnyAuthenticatedClient::new(ca_store));
- config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?;
- let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config)));
-
- let listener = TcpListener::bind(&self.bind_addr).await?;
- let incoming = TcpListenerStream::new(listener).filter_map(|socket| async {
- match socket {
- Ok(stream) => match tls_acceptor.clone().accept(stream).await {
- Ok(x) => Some(Ok::<_, hyper::Error>(x)),
- Err(_e) => None,
- },
- Err(_) => None,
- }
- });
- let incoming = hyper::server::accept::from_stream(incoming);
-
- let self_arc = self.clone();
- let service = make_service_fn(|conn: &TlsStream<TcpStream>| {
- let client_addr = conn
- .get_ref()
- .0
- .peer_addr()
- .unwrap_or_else(|_| ([0, 0, 0, 0], 0).into());
- let self_arc = self_arc.clone();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- self_arc.clone().handler(req, client_addr).map_err(|e| {
- warn!("RPC handler error: {}", e);
- e
- })
- }))
- }
- });
-
- let server = Server::builder(incoming).serve(service);
-
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("RPC server listening on http://{}", self.bind_addr);
-
- graceful.await?;
- } else {
- let self_arc = self.clone();
- let service = make_service_fn(move |conn: &AddrStream| {
- let client_addr = conn.remote_addr();
- let self_arc = self_arc.clone();
- async move {
- Ok::<_, Error>(service_fn(move |req: Request<Body>| {
- self_arc.clone().handler(req, client_addr).map_err(|e| {
- warn!("RPC handler error: {}", e);
- e
- })
- }))
- }
- });
-
- let server = Server::bind(&self.bind_addr).serve(service);
-
- let graceful = server.with_graceful_shutdown(shutdown_signal);
- info!("RPC server listening on http://{}", self.bind_addr);
-
- graceful.await?;
- }
-
- Ok(())
- }
-}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
new file mode 100644
index 00000000..7ccec945
--- /dev/null
+++ b/src/rpc/system.rs
@@ -0,0 +1,363 @@
+//! Module containing structs related to membership management
+use std::io::{Read, Write};
+use std::net::SocketAddr;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::time::Duration;
+
+use arc_swap::ArcSwap;
+use async_trait::async_trait;
+use futures::{join, select};
+use futures_util::future::*;
+use serde::{Deserialize, Serialize};
+use sodiumoxide::crypto::sign::ed25519;
+use tokio::sync::watch;
+use tokio::sync::Mutex;
+
+use netapp::endpoint::{Endpoint, EndpointHandler, Message};
+use netapp::peering::fullmesh::FullMeshPeeringStrategy;
+use netapp::proto::*;
+use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
+
+use garage_util::background::BackgroundRunner;
+use garage_util::error::Error;
+use garage_util::persister::Persister;
+//use garage_util::time::*;
+
+//use crate::consul::get_consul_nodes;
+use crate::ring::*;
+use crate::rpc_helper::{RequestStrategy, RpcHelper};
+
+const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
+const PING_TIMEOUT: Duration = Duration::from_secs(2);
+
+/// RPC endpoint used for calls related to membership
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+
+/// RPC messages related to membership
+#[derive(Debug, Serialize, Deserialize, Clone)]
+pub enum SystemRpc {
+ /// Response to successfull advertisements
+ Ok,
+ /// Error response
+ Error(String),
+ /// Ask other node its config. Answered with AdvertiseConfig
+ PullConfig,
+ /// Advertise Garage status. Answered with another AdvertiseStatus.
+ /// Exchanged with every node on a regular basis.
+ AdvertiseStatus(StateInfo),
+ /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
+ AdvertiseConfig(NetworkConfig),
+ /// Get known nodes states
+ GetKnownNodes,
+ /// Return known nodes
+ ReturnKnownNodes(Vec<(NodeID, SocketAddr, bool)>),
+}
+
+impl Message for SystemRpc {
+ type Response = SystemRpc;
+}
+
+/// This node's membership manager
+pub struct System {
+ /// The id of this node
+ pub id: NodeID,
+
+ persist_config: Persister<NetworkConfig>,
+
+ state_info: ArcSwap<StateInfo>,
+
+ pub netapp: Arc<NetApp>,
+ fullmesh: Arc<FullMeshPeeringStrategy>,
+ pub rpc: RpcHelper,
+
+ system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+
+ rpc_listen_addr: SocketAddr,
+ bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
+ replication_factor: usize,
+
+ /// The ring
+ pub ring: watch::Receiver<Arc<Ring>>,
+ update_ring: Mutex<watch::Sender<Arc<Ring>>>,
+
+ /// The job runner of this node
+ pub background: Arc<BackgroundRunner>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StateInfo {
+ /// Hostname of the node
+ pub hostname: String,
+ /// Replication factor configured on the node
+ pub replication_factor: usize,
+ /// Configuration version
+ pub config_version: u64,
+}
+
+fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
+ let mut id_file = metadata_dir.to_path_buf();
+ id_file.push("node_id");
+ if id_file.as_path().exists() {
+ let mut f = std::fs::File::open(id_file.as_path())?;
+ let mut d = vec![];
+ f.read_to_end(&mut d)?;
+ if d.len() != 64 {
+ return Err(Error::Message("Corrupt node_id file".to_string()));
+ }
+
+ let mut key = [0u8; 64];
+ key.copy_from_slice(&d[..]);
+ Ok(NodeKey::from_slice(&key[..]).unwrap())
+ } else {
+ let (key, _) = ed25519::gen_keypair();
+
+ let mut f = std::fs::File::create(id_file.as_path())?;
+ f.write_all(&key[..])?;
+ Ok(NodeKey::from_slice(&key[..]).unwrap())
+ }
+}
+
+impl System {
+ /// Create this node's membership manager
+ pub fn new(
+ network_key: NetworkKey,
+ metadata_dir: PathBuf,
+ background: Arc<BackgroundRunner>,
+ replication_factor: usize,
+ rpc_listen_addr: SocketAddr,
+ bootstrap_peers: Vec<(NodeID, SocketAddr)>,
+ consul_host: Option<String>,
+ consul_service_name: Option<String>,
+ ) -> Arc<Self> {
+ let node_key = gen_node_key(&metadata_dir).expect("Unable to read or generate node ID");
+ info!("Node public key: {}", hex::encode(&node_key.public_key()));
+
+ let persist_config = Persister::new(&metadata_dir, "network_config");
+
+ let net_config = match persist_config.load() {
+ Ok(x) => x,
+ Err(e) => {
+ match Persister::<garage_rpc_021::ring::NetworkConfig>::new(
+ &metadata_dir,
+ "network_config",
+ )
+ .load()
+ {
+ Ok(old_config) => NetworkConfig::migrate_from_021(old_config),
+ Err(e2) => {
+ info!(
+ "No valid previous network configuration stored ({}, {}), starting fresh.",
+ e, e2
+ );
+ NetworkConfig::new()
+ }
+ }
+ }
+ };
+
+ let state_info = StateInfo {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ replication_factor: replication_factor,
+ config_version: net_config.version,
+ };
+
+ let ring = Ring::new(net_config, replication_factor);
+ let (update_ring, ring) = watch::channel(Arc::new(ring));
+
+ let netapp = NetApp::new(network_key, node_key);
+ let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers.clone());
+
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+
+ let sys = Arc::new(System {
+ id: netapp.id.clone(),
+ persist_config,
+ state_info: ArcSwap::new(Arc::new(state_info)),
+ netapp: netapp.clone(),
+ fullmesh: fullmesh.clone(),
+ rpc: RpcHelper {
+ fullmesh: fullmesh.clone(),
+ background: background.clone(),
+ },
+ system_endpoint,
+ replication_factor,
+ rpc_listen_addr,
+ bootstrap_peers,
+ consul_host,
+ consul_service_name,
+ ring,
+ update_ring: Mutex::new(update_ring),
+ background: background.clone(),
+ });
+ sys.system_endpoint.set_handler(sys.clone());
+ sys
+ }
+
+ /// Perform bootstraping, starting the ping loop
+ pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
+ join!(
+ self.netapp
+ .clone()
+ .listen(self.rpc_listen_addr, None, must_exit.clone()),
+ self.fullmesh.clone().run(must_exit.clone()),
+ self.discovery_loop(must_exit.clone()),
+ );
+ }
+
+ // ---- INTERNALS ----
+
+ /// Save network configuration to disc
+ async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
+ let ring: Arc<Ring> = self.ring.borrow().clone();
+ self.persist_config
+ .save_async(&ring.config)
+ .await
+ .expect("Cannot save current cluster configuration");
+ Ok(())
+ }
+
+ fn update_state_info(&self) {
+ let mut new_si: StateInfo = self.state_info.load().as_ref().clone();
+
+ let ring = self.ring.borrow();
+ new_si.config_version = ring.config.version;
+ self.state_info.swap(Arc::new(new_si));
+ }
+
+ fn handle_pull_config(&self) -> SystemRpc {
+ let ring = self.ring.borrow().clone();
+ SystemRpc::AdvertiseConfig(ring.config.clone())
+ }
+
+ async fn handle_advertise_config(
+ self: Arc<Self>,
+ adv: &NetworkConfig,
+ ) -> Result<SystemRpc, Error> {
+ let update_ring = self.update_ring.lock().await;
+ let ring: Arc<Ring> = self.ring.borrow().clone();
+
+ if adv.version > ring.config.version {
+ let ring = Ring::new(adv.clone(), self.replication_factor);
+ update_ring.send(Arc::new(ring))?;
+ drop(update_ring);
+
+ let self2 = self.clone();
+ let adv2 = adv.clone();
+ self.background.spawn_cancellable(async move {
+ self2
+ .rpc
+ .broadcast(
+ &self2.system_endpoint,
+ SystemRpc::AdvertiseConfig(adv2),
+ RequestStrategy::with_priority(PRIO_NORMAL),
+ )
+ .await;
+ Ok(())
+ });
+ self.background.spawn(self.clone().save_network_config());
+ }
+
+ Ok(SystemRpc::Ok)
+ }
+
+ async fn discovery_loop(&self, mut stop_signal: watch::Receiver<bool>) {
+ /* TODO
+ let consul_config = match (&self.consul_host, &self.consul_service_name) {
+ (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
+ _ => None,
+ };
+ */
+
+ while !*stop_signal.borrow() {
+ let not_configured = self.ring.borrow().config.members.is_empty();
+ let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
+ let bad_peers = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .filter(|p| p.is_up())
+ .count() != self.ring.borrow().config.members.len();
+
+ if not_configured || no_peers || bad_peers {
+ info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
+
+ let ping_list = self.bootstrap_peers.clone();
+
+ /*
+ *TODO bring this back: persisted list of peers
+ if let Ok(peers) = self.persist_status.load_async().await {
+ ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
+ }
+ */
+
+ /*
+ * TODO bring this back: get peers from consul
+ if let Some((consul_host, consul_service_name)) = &consul_config {
+ match get_consul_nodes(consul_host, consul_service_name).await {
+ Ok(node_list) => {
+ ping_list.extend(node_list.iter().map(|a| (*a, None)));
+ }
+ Err(e) => {
+ warn!("Could not retrieve node list from Consul: {}", e);
+ }
+ }
+ }
+ */
+
+ for (node_id, node_addr) in ping_list {
+ tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id));
+ }
+ }
+
+ let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
+ select! {
+ _ = restart_at.fuse() => {},
+ _ = stop_signal.changed().fuse() => {},
+ }
+ }
+ }
+
+ async fn pull_config(self: Arc<Self>, peer: NodeID) {
+ let resp = self
+ .rpc
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullConfig,
+ RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseConfig(config)) = resp {
+ let _: Result<_, _> = self.handle_advertise_config(&config).await;
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<SystemRpc> for System {
+ async fn handle(self: &Arc<Self>, msg: &SystemRpc, _from: NodeID) -> SystemRpc {
+ let resp = match msg {
+ SystemRpc::PullConfig => Ok(self.handle_pull_config()),
+ SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await,
+ SystemRpc::GetKnownNodes => {
+ let known_nodes = self
+ .fullmesh
+ .get_peer_list()
+ .iter()
+ .map(|n| (n.id, n.addr, n.is_up()))
+ .collect::<Vec<_>>();
+ Ok(SystemRpc::ReturnKnownNodes(known_nodes))
+ }
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
+ };
+ match resp {
+ Ok(r) => r,
+ Err(e) => SystemRpc::Error(format!("{}", e)),
+ }
+ }
+}
diff --git a/src/rpc/tls_util.rs b/src/rpc/tls_util.rs
deleted file mode 100644
index 8189f93b..00000000
--- a/src/rpc/tls_util.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-use core::future::Future;
-use core::task::{Context, Poll};
-use std::pin::Pin;
-use std::sync::Arc;
-use std::{fs, io};
-
-use futures_util::future::*;
-use hyper::client::connect::Connection;
-use hyper::client::HttpConnector;
-use hyper::service::Service;
-use hyper::Uri;
-use hyper_rustls::MaybeHttpsStream;
-use rustls::internal::pemfile;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio_rustls::TlsConnector;
-use webpki::DNSNameRef;
-
-use garage_util::error::Error;
-
-pub fn load_certs(filename: &str) -> Result<Vec<rustls::Certificate>, Error> {
- let certfile = fs::File::open(&filename)?;
- let mut reader = io::BufReader::new(certfile);
-
- let certs = pemfile::certs(&mut reader).map_err(|_| {
- Error::Message(format!(
- "Could not deecode certificates from file: {}",
- filename
- ))
- })?;
-
- if certs.is_empty() {
- return Err(Error::Message(format!(
- "Invalid certificate file: {}",
- filename
- )));
- }
- Ok(certs)
-}
-
-pub fn load_private_key(filename: &str) -> Result<rustls::PrivateKey, Error> {
- let keydata = fs::read_to_string(filename)?;
-
- let mut buf1 = keydata.as_bytes();
- let rsa_keys = pemfile::rsa_private_keys(&mut buf1).unwrap_or_default();
-
- let mut buf2 = keydata.as_bytes();
- let pkcs8_keys = pemfile::pkcs8_private_keys(&mut buf2).unwrap_or_default();
-
- let mut keys = rsa_keys;
- keys.extend(pkcs8_keys.into_iter());
-
- if keys.len() != 1 {
- return Err(Error::Message(format!(
- "Invalid private key file: {} ({} private keys)",
- filename,
- keys.len()
- )));
- }
- Ok(keys[0].clone())
-}
-
-// ---- AWFUL COPYPASTA FROM HYPER-RUSTLS connector.rs
-// ---- ALWAYS USE `garage` AS HOSTNAME FOR TLS VERIFICATION
-
-#[derive(Clone)]
-pub struct HttpsConnectorFixedDnsname<T> {
- http: T,
- tls_config: Arc<rustls::ClientConfig>,
- fixed_dnsname: &'static str,
-}
-
-type BoxError = Box<dyn std::error::Error + Send + Sync>;
-
-impl HttpsConnectorFixedDnsname<HttpConnector> {
- pub fn new(mut tls_config: rustls::ClientConfig, fixed_dnsname: &'static str) -> Self {
- let mut http = HttpConnector::new();
- http.enforce_http(false);
- tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
- Self {
- http,
- tls_config: Arc::new(tls_config),
- fixed_dnsname,
- }
- }
-}
-
-impl<T> Service<Uri> for HttpsConnectorFixedDnsname<T>
-where
- T: Service<Uri>,
- T::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
- T::Future: Send + 'static,
- T::Error: Into<BoxError>,
-{
- type Response = MaybeHttpsStream<T::Response>;
- type Error = BoxError;
-
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<MaybeHttpsStream<T::Response>, BoxError>> + Send>>;
-
- fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.http.poll_ready(cx) {
- Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
- Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
- Poll::Pending => Poll::Pending,
- }
- }
-
- fn call(&mut self, dst: Uri) -> Self::Future {
- let is_https = dst.scheme_str() == Some("https");
-
- if !is_https {
- let connecting_future = self.http.call(dst);
-
- let f = async move {
- let tcp = connecting_future.await.map_err(Into::into)?;
-
- Ok(MaybeHttpsStream::Http(tcp))
- };
- f.boxed()
- } else {
- let cfg = self.tls_config.clone();
- let connecting_future = self.http.call(dst);
-
- let dnsname =
- DNSNameRef::try_from_ascii_str(self.fixed_dnsname).expect("Invalid fixed dnsname");
-
- let f = async move {
- let tcp = connecting_future.await.map_err(Into::into)?;
- let connector = TlsConnector::from(cfg);
- let tls = connector
- .connect(dnsname, tcp)
- .await
- .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
- Ok(MaybeHttpsStream::Https(tls))
- };
- f.boxed()
- }
- }
-}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ccbd1748..996066dc 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
garage_rpc = { version = "0.3.0", path = "../rpc" }
garage_util = { version = "0.3.0", path = "../util" }
+async-trait = "0.1.7"
bytes = "1.0"
hexdump = "0.1"
log = "0.4"
@@ -30,4 +31,3 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-
diff --git a/src/table/data.rs b/src/table/data.rs
index e7e85e65..ffd494d5 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -9,7 +9,7 @@ use tokio::sync::Notify;
use garage_util::data::*;
use garage_util::error::*;
-use garage_rpc::membership::System;
+use garage_rpc::system::System;
use crate::crdt::Crdt;
use crate::replication::*;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 73e08827..c03648ef 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -13,9 +14,8 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::replication::*;
@@ -24,11 +24,11 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGc<F: TableSchema, R: TableReplication> {
+pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
- rpc_client: Arc<RpcClient<GcRpc>>,
+ endpoint: Arc<Endpoint<GcRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -36,30 +36,30 @@ enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
+ Error(String),
}
-impl RpcMessage for GcRpc {}
+impl Message for GcRpc {
+ type Response = GcRpc;
+}
impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(
- system: Arc<System>,
- data: Arc<TableData<F, R>>,
- rpc_server: &mut RpcServer,
- ) -> Arc<Self> {
- let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = system.rpc_client::<GcRpc>(&rpc_path);
+ pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name));
let gc = Arc::new(Self {
system: system.clone(),
data: data.clone(),
- rpc_client,
+ endpoint,
});
- gc.register_handler(rpc_server, rpc_path);
+ gc.endpoint.set_handler(gc.clone());
let gc1 = gc.clone();
system.background.spawn_worker(
@@ -168,7 +168,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<Uuid>,
+ nodes: Vec<NodeID>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -180,11 +180,15 @@ where
deletes.push((k, vhash));
}
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -193,11 +197,15 @@ where
self.data.name, n_items
);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes.clone()),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -217,24 +225,7 @@ where
Ok(())
}
- // ---- RPC HANDLER ----
-
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
- async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> {
+ async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
@@ -251,3 +242,16 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| GcRpc::Error(format!("{}", e)))
+ }
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 3ce7c0bf..b41c5360 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,7 +1,8 @@
use std::sync::Arc;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
+use garage_rpc::system::System;
+use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -19,16 +20,20 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
- fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
- ring.config.members.keys().cloned().collect::<Vec<_>>()
+ ring.config
+ .members
+ .keys()
+ .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
+ .collect::<Vec<_>>()
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.ring.borrow().config.members.len();
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 64996828..7fdfce67 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,5 +1,5 @@
use garage_rpc::ring::*;
-
+use garage_rpc::NodeID;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
@@ -8,12 +8,12 @@ pub trait TableReplication: Send + Sync {
// To understand various replication methods
/// Which nodes to send read requests to
- fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+ fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 8081b892..ffe686a5 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,7 +1,8 @@
use std::sync::Arc;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
+use garage_rpc::system::System;
+use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -25,7 +26,7 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
- fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
@@ -33,7 +34,7 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index a3afbbba..c5db0987 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
@@ -13,10 +14,9 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::merkle::*;
@@ -28,13 +28,13 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
- rpc_client: Arc<RpcClient<SyncRpc>>,
+ endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -45,9 +45,12 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
+ Error(String),
}
-impl RpcMessage for SyncRpc {}
+impl Message for SyncRpc {
+ type Response = SyncRpc;
+}
struct SyncTodo {
todo: Vec<TodoPartition>,
@@ -72,10 +75,10 @@ where
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path);
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name));
let todo = SyncTodo { todo: vec![] };
@@ -84,10 +87,10 @@ where
data: data.clone(),
merkle,
todo: Mutex::new(todo),
- rpc_client,
+ endpoint,
});
- syncer.register_handler(rpc_server, rpc_path);
+ syncer.endpoint.set_handler(syncer.clone());
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
@@ -112,21 +115,6 @@ where
syncer
}
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
@@ -317,15 +305,19 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[Uuid],
+ nodes: &[NodeID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -362,7 +354,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: Uuid,
+ who: NodeID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -378,11 +370,14 @@ where
// Check if they have the same root checksum
// If so, do nothing.
let root_resp = self
- .rpc_client
+ .system
+ .rpc
.call(
+ &self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- TABLE_SYNC_RPC_TIMEOUT,
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -430,8 +425,15 @@ where
// Get Merkle node for this tree position at remote node
// and compare it with local node
let remote_node = match self
- .rpc_client
- .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::GetNode(key.clone()),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?
{
SyncRpc::Node(_, node) => node,
@@ -478,7 +480,7 @@ where
Ok(())
}
- async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -492,8 +494,15 @@ where
.collect::<Vec<_>>();
let rpc_resp = self
- .rpc_client
- .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::Items(values),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?;
if let SyncRpc::Ok = rpc_resp {
Ok(())
@@ -506,7 +515,6 @@ where
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
-
async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
@@ -527,6 +535,19 @@ where
}
}
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
+ }
+}
+
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
diff --git a/src/table/table.rs b/src/table/table.rs
index eb9bd25c..ad263343 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -9,9 +10,8 @@ use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::crdt::Crdt;
use crate::data::*;
@@ -23,17 +23,18 @@ use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct Table<F: TableSchema, R: TableReplication> {
+pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
- rpc_client: Arc<RpcClient<TableRpc<F>>>,
+ endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
Ok,
+ Error(String),
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
@@ -44,7 +45,9 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> RpcMessage for TableRpc<F> {}
+impl<F: TableSchema> Message for TableRpc<F> {
+ type Response = TableRpc<F>;
+}
impl<F, R> Table<F, R>
where
@@ -59,32 +62,27 @@ where
system: Arc<System>,
db: &sled::Db,
name: String,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let rpc_path = format!("table_{}", name);
- let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path);
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/table.rs/Rpc:{}", name));
let data = TableData::new(system.clone(), name, instance, replication, db);
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
- let syncer = TableSyncer::launch(
- system.clone(),
- data.clone(),
- merkle_updater.clone(),
- rpc_server,
- );
- TableGc::launch(system.clone(), data.clone(), rpc_server);
+ let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
+ TableGc::launch(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
syncer,
- rpc_client,
+ endpoint,
});
- table.clone().register_handler(rpc_server, rpc_path);
+ table.endpoint.set_handler(table.clone());
table
}
@@ -97,11 +95,14 @@ where
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.write_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -123,7 +124,16 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRpc::<F>::Update(entries);
- let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ node,
+ rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
+ )
+ .await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -152,11 +162,14 @@ where
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.read_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -208,11 +221,14 @@ where
let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.read_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -261,36 +277,25 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
- RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(who.len())
+ .with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())
}
- // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
- }
-
- async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
+ // ====== RPC HANDLER =====
+ //
+ async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
match msg {
TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
@@ -308,3 +313,16 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> {
+ self.handle_rpc(msg)
+ .await
+ .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e)))
+ }
+}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 91e0b2b9..0f041074 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -32,7 +32,6 @@ toml = "0.5"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
http = "0.2"
hyper = "0.14"
-rustls = "0.19"
-webpki = "0.21"
diff --git a/src/util/config.rs b/src/util/config.rs
index 46b918a9..ee153dfa 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -3,8 +3,11 @@ use std::io::Read;
use std::net::SocketAddr;
use std::path::PathBuf;
+use serde::de::Error as SerdeError;
use serde::{de, Deserialize};
+use netapp::NodeID;
+
use crate::error::Error;
/// Represent the whole configuration
@@ -26,20 +29,20 @@ pub struct Config {
// (we can add more aliases for this later)
pub replication_mode: String,
+ /// RPC secret key: 32 bytes hex encoded
+ pub rpc_secret: String,
+
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")]
- pub bootstrap_peers: Vec<SocketAddr>,
+ pub bootstrap_peers: Vec<(NodeID, SocketAddr)>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>,
- /// Configuration for RPC TLS
- pub rpc_tls: Option<TlsConfig>,
-
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize,
@@ -59,17 +62,6 @@ pub struct Config {
pub s3_web: WebConfig,
}
-/// Configuration for RPC TLS
-#[derive(Deserialize, Debug, Clone)]
-pub struct TlsConfig {
- /// Path to certificate autority used for all nodes
- pub ca_cert: String,
- /// Path to public certificate for this node
- pub node_cert: String,
- /// Path to private key for this node
- pub node_key: String,
-}
-
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig {
@@ -115,27 +107,32 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
Ok(toml::from_str(&config)?)
}
-fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<SocketAddr>, D::Error>
+fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result<Vec<(NodeID, SocketAddr)>, D::Error>
where
D: de::Deserializer<'de>,
{
use std::net::ToSocketAddrs;
- Ok(<Vec<&str>>::deserialize(deserializer)?
- .iter()
- .filter_map(|&name| {
- name.to_socket_addrs()
- .map(|iter| (name, iter))
- .map_err(|_| warn!("Error resolving \"{}\"", name))
- .ok()
- })
- .map(|(name, iter)| {
- let v = iter.collect::<Vec<_>>();
- if v.is_empty() {
- warn!("Error resolving \"{}\"", name)
- }
- v
- })
- .flatten()
- .collect())
+ let mut ret = vec![];
+
+ for peer in <Vec<&str>>::deserialize(deserializer)? {
+ let delim = peer
+ .find('@')
+ .ok_or_else(|| D::Error::custom("Invalid bootstrap peer: public key not specified"))?;
+ let (key, host) = peer.split_at(delim);
+ let pubkey = NodeID::from_slice(&hex::decode(&key).map_err(D::Error::custom)?)
+ .ok_or_else(|| D::Error::custom("Invalid bootstrap peer public key"))?;
+ let hosts = host[1..]
+ .to_socket_addrs()
+ .map_err(D::Error::custom)?
+ .collect::<Vec<_>>();
+ if hosts.is_empty() {
+ return Err(D::Error::custom(format!("Error resolving {}", &host[1..])));
+ }
+ for host in hosts {
+ ret.push((pubkey.clone(), host));
+ }
+ }
+
+ Ok(ret)
}
diff --git a/src/util/error.rs b/src/util/error.rs
index c3d84e63..804a0d4d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -11,8 +11,8 @@ pub enum RpcError {
#[error(display = "Node is down: {:?}.", _0)]
NodeDown(Uuid),
- #[error(display = "Timeout: {}", _0)]
- Timeout(#[error(source)] tokio::time::error::Elapsed),
+ #[error(display = "Timeout")]
+ Timeout,
#[error(display = "HTTP error: {}", _0)]
Http(#[error(source)] http::Error),
@@ -45,11 +45,8 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError),
- #[error(display = "TLS error: {}", _0)]
- Tls(#[error(source)] rustls::TLSError),
-
- #[error(display = "PKI error: {}", _0)]
- Pki(#[error(source)] webpki::Error),
+ #[error(display = "Netapp error: {}", _0)]
+ Netapp(#[error(source)] netapp::error::Error),
#[error(display = "Sled error: {}", _0)]
Sled(#[error(source)] sled::Error),