aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--Cargo.lock1409
-rw-r--r--Cargo.toml27
-rw-r--r--Makefile3
-rw-r--r--examples/basalt.rs76
-rw-r--r--examples/fullmesh.rs68
-rw-r--r--rustfmt.toml1
-rw-r--r--src/conn.rs280
-rw-r--r--src/error.rs57
-rw-r--r--src/lib.rs9
-rw-r--r--src/message.rs18
-rw-r--r--src/netapp.rs214
-rw-r--r--src/peering/basalt.rs475
-rw-r--r--src/peering/fullmesh.rs437
-rw-r--r--src/peering/mod.rs2
-rw-r--r--src/proto.rs251
-rw-r--r--src/util.rs14
17 files changed, 3342 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..2f7896d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+target/
diff --git a/Cargo.lock b/Cargo.lock
new file mode 100644
index 0000000..3dac6d0
--- /dev/null
+++ b/Cargo.lock
@@ -0,0 +1,1409 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+[[package]]
+name = "adler32"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234"
+
+[[package]]
+name = "ahash"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f6789e291be47ace86a60303502173d84af8327e3627ecf334356ee0f87a164c"
+
+[[package]]
+name = "aho-corasick"
+version = "0.7.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "arc-swap"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0688b520bcc7498f6ca8fa006e8031d353e3fd4f51bd4a50fb03cc4230b28bd2"
+
+[[package]]
+name = "async-attributes"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423"
+dependencies = [
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "async-channel"
+version = "1.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9"
+dependencies = [
+ "concurrent-queue",
+ "event-listener",
+ "futures-core",
+]
+
+[[package]]
+name = "async-executor"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146"
+dependencies = [
+ "async-task",
+ "concurrent-queue",
+ "fastrand",
+ "futures-lite",
+ "once_cell",
+ "vec-arena",
+]
+
+[[package]]
+name = "async-global-executor"
+version = "1.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73079b49cd26b8fd5a15f68fc7707fc78698dc2a3d61430f2a7a9430230dfa04"
+dependencies = [
+ "async-executor",
+ "async-io",
+ "futures-lite",
+ "num_cpus",
+ "once_cell",
+]
+
+[[package]]
+name = "async-io"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "40a0b2bb8ae20fede194e779150fe283f65a4a08461b496de546ec366b174ad9"
+dependencies = [
+ "concurrent-queue",
+ "fastrand",
+ "futures-lite",
+ "libc",
+ "log",
+ "nb-connect",
+ "once_cell",
+ "parking",
+ "polling",
+ "vec-arena",
+ "waker-fn",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "async-mutex"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
+dependencies = [
+ "event-listener",
+]
+
+[[package]]
+name = "async-std"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a7e82538bc65a25dbdff70e4c5439d52f068048ab97cdea0acd73f131594caa1"
+dependencies = [
+ "async-attributes",
+ "async-global-executor",
+ "async-io",
+ "async-mutex",
+ "blocking",
+ "crossbeam-utils",
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-lite",
+ "gloo-timers",
+ "kv-log-macro",
+ "log",
+ "memchr",
+ "num_cpus",
+ "once_cell",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+ "wasm-bindgen-futures",
+]
+
+[[package]]
+name = "async-task"
+version = "4.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0"
+
+[[package]]
+name = "async-trait"
+version = "0.1.41"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "atomic-waker"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
+
+[[package]]
+name = "atty"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
+dependencies = [
+ "hermit-abi",
+ "libc",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "autocfg"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
+
+[[package]]
+name = "base64"
+version = "0.12.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff"
+
+[[package]]
+name = "bitflags"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
+
+[[package]]
+name = "blocking"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9"
+dependencies = [
+ "async-channel",
+ "async-task",
+ "atomic-waker",
+ "fastrand",
+ "futures-lite",
+ "once_cell",
+]
+
+[[package]]
+name = "bumpalo"
+version = "3.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
+
+[[package]]
+name = "byteorder"
+version = "1.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
+
+[[package]]
+name = "bytes"
+version = "0.5.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
+
+[[package]]
+name = "bytes"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
+
+[[package]]
+name = "cache-padded"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
+
+[[package]]
+name = "cc"
+version = "1.0.63"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad9c6140b5a2c7db40ea56eb1821245e5362b44385c05b76288b1a599934ac87"
+
+[[package]]
+name = "cfg-if"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
+
+[[package]]
+name = "cfg-if"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+
+[[package]]
+name = "clap"
+version = "2.33.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
+dependencies = [
+ "bitflags",
+ "textwrap",
+ "unicode-width",
+]
+
+[[package]]
+name = "cloudabi"
+version = "0.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
+name = "concurrent-queue"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
+dependencies = [
+ "cache-padded",
+]
+
+[[package]]
+name = "const_fn"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab"
+
+[[package]]
+name = "crc32fast"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
+dependencies = [
+ "cfg-if 1.0.0",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5"
+dependencies = [
+ "autocfg",
+ "cfg-if 1.0.0",
+ "const_fn",
+ "lazy_static",
+]
+
+[[package]]
+name = "env_logger"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
+dependencies = [
+ "atty",
+ "humantime",
+ "log",
+ "regex",
+ "termcolor",
+]
+
+[[package]]
+name = "err-derive"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "22deed3a8124cff5fa835713fa105621e43bbdc46690c3a6b68328a012d350d4"
+dependencies = [
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "rustversion",
+ "syn",
+ "synstructure",
+]
+
+[[package]]
+name = "event-listener"
+version = "2.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
+
+[[package]]
+name = "fastrand"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3"
+dependencies = [
+ "instant",
+]
+
+[[package]]
+name = "filetime"
+version = "0.2.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c122a393ea57648015bf06fbd3d372378992e86b9ff5a7a497b076a28c79efe"
+dependencies = [
+ "cfg-if 1.0.0",
+ "libc",
+ "redox_syscall",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "fnv"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+
+[[package]]
+name = "fuchsia-cprng"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
+
+[[package]]
+name = "fuchsia-zircon"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
+dependencies = [
+ "bitflags",
+ "fuchsia-zircon-sys",
+]
+
+[[package]]
+name = "fuchsia-zircon-sys"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
+
+[[package]]
+name = "futures"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb"
+
+[[package]]
+name = "futures-lite"
+version = "1.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5e6c079abfac3ab269e2927ec048dabc89d009ebfdda6b8ee86624f30c689658"
+dependencies = [
+ "fastrand",
+ "futures-core",
+ "futures-io",
+ "memchr",
+ "parking",
+ "pin-project-lite",
+ "waker-fn",
+]
+
+[[package]]
+name = "futures-macro"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556"
+dependencies = [
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d"
+
+[[package]]
+name = "futures-task"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d"
+dependencies = [
+ "once_cell",
+]
+
+[[package]]
+name = "futures-util"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project",
+ "pin-utils",
+ "proc-macro-hack",
+ "proc-macro-nested",
+ "slab",
+]
+
+[[package]]
+name = "gloo-timers"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "hashbrown"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
+dependencies = [
+ "ahash",
+]
+
+[[package]]
+name = "heck"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
+dependencies = [
+ "unicode-segmentation",
+]
+
+[[package]]
+name = "hermit-abi"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "hex"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
+
+[[package]]
+name = "humantime"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
+dependencies = [
+ "quick-error",
+]
+
+[[package]]
+name = "instant"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb1fc4429a33e1f80d41dc9fea4d108a88bec1de8053878898ae448a0b52f613"
+dependencies = [
+ "cfg-if 1.0.0",
+]
+
+[[package]]
+name = "iovec"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "js-sys"
+version = "0.3.45"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8"
+dependencies = [
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "kernel32-sys"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
+dependencies = [
+ "winapi 0.2.8",
+ "winapi-build",
+]
+
+[[package]]
+name = "kuska-handshake"
+version = "0.1.1"
+dependencies = [
+ "async-std",
+ "futures",
+ "hex",
+ "log",
+ "sodiumoxide",
+ "thiserror",
+ "tokio",
+]
+
+[[package]]
+name = "kv-log-macro"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
+dependencies = [
+ "log",
+]
+
+[[package]]
+name = "lazy_static"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+
+[[package]]
+name = "libc"
+version = "0.2.80"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614"
+
+[[package]]
+name = "libflate"
+version = "0.1.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d9135df43b1f5d0e333385cb6e7897ecd1a43d7d11b91ac003f4d2c2d2401fdd"
+dependencies = [
+ "adler32",
+ "crc32fast",
+ "rle-decode-fast",
+ "take_mut",
+]
+
+[[package]]
+name = "libsodium-sys"
+version = "0.2.4"
+source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8"
+dependencies = [
+ "cc",
+ "libc",
+ "libflate",
+ "pkg-config",
+ "tar",
+ "vcpkg",
+]
+
+[[package]]
+name = "log"
+version = "0.4.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b"
+dependencies = [
+ "cfg-if 0.1.10",
+]
+
+[[package]]
+name = "lru"
+version = "0.6.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0"
+dependencies = [
+ "hashbrown",
+]
+
+[[package]]
+name = "memchr"
+version = "2.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
+
+[[package]]
+name = "mio"
+version = "0.6.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
+dependencies = [
+ "cfg-if 0.1.10",
+ "fuchsia-zircon",
+ "fuchsia-zircon-sys",
+ "iovec",
+ "kernel32-sys",
+ "libc",
+ "log",
+ "miow 0.2.1",
+ "net2",
+ "slab",
+ "winapi 0.2.8",
+]
+
+[[package]]
+name = "mio-named-pipes"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
+dependencies = [
+ "log",
+ "mio",
+ "miow 0.3.6",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "mio-uds"
+version = "0.6.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
+dependencies = [
+ "iovec",
+ "libc",
+ "mio",
+]
+
+[[package]]
+name = "miow"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
+dependencies = [
+ "kernel32-sys",
+ "net2",
+ "winapi 0.2.8",
+ "ws2_32-sys",
+]
+
+[[package]]
+name = "miow"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897"
+dependencies = [
+ "socket2",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "nb-connect"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998"
+dependencies = [
+ "libc",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "net2"
+version = "0.2.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853"
+dependencies = [
+ "cfg-if 0.1.10",
+ "libc",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "netapp"
+version = "0.1.0"
+dependencies = [
+ "arc-swap",
+ "async-std",
+ "async-trait",
+ "base64",
+ "bytes 0.6.0",
+ "env_logger",
+ "err-derive",
+ "hex",
+ "kuska-handshake",
+ "log",
+ "lru",
+ "pretty_env_logger",
+ "rand",
+ "rmp-serde",
+ "serde",
+ "sodiumoxide",
+ "structopt",
+ "tokio",
+]
+
+[[package]]
+name = "num-traits"
+version = "0.2.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "num_cpus"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
+dependencies = [
+ "hermit-abi",
+ "libc",
+]
+
+[[package]]
+name = "once_cell"
+version = "1.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
+
+[[package]]
+name = "parking"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
+
+[[package]]
+name = "pin-project"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "pin-project-lite"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b"
+
+[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
+[[package]]
+name = "pkg-config"
+version = "0.3.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
+
+[[package]]
+name = "polling"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4"
+dependencies = [
+ "cfg-if 0.1.10",
+ "libc",
+ "log",
+ "wepoll-sys",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "pretty_env_logger"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d"
+dependencies = [
+ "env_logger",
+ "log",
+]
+
+[[package]]
+name = "proc-macro-error"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
+dependencies = [
+ "proc-macro-error-attr",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-error-attr"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "version_check",
+]
+
+[[package]]
+name = "proc-macro-hack"
+version = "0.5.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
+
+[[package]]
+name = "proc-macro-nested"
+version = "0.1.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71"
+dependencies = [
+ "unicode-xid",
+]
+
+[[package]]
+name = "quick-error"
+version = "1.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
+
+[[package]]
+name = "quote"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
+name = "rand"
+version = "0.5.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9"
+dependencies = [
+ "cloudabi",
+ "fuchsia-cprng",
+ "libc",
+ "rand_core 0.3.1",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "rand_core"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
+dependencies = [
+ "rand_core 0.4.2",
+]
+
+[[package]]
+name = "rand_core"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
+
+[[package]]
+name = "redox_syscall"
+version = "0.1.57"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
+
+[[package]]
+name = "regex"
+version = "1.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+ "thread_local",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.6.21"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189"
+
+[[package]]
+name = "rle-decode-fast"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac"
+
+[[package]]
+name = "rmp"
+version = "0.8.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f"
+dependencies = [
+ "byteorder",
+ "num-traits",
+]
+
+[[package]]
+name = "rmp-serde"
+version = "0.14.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ce7d70c926fe472aed493b902010bccc17fa9f7284145cb8772fd22fdb052d8"
+dependencies = [
+ "byteorder",
+ "rmp",
+ "serde",
+]
+
+[[package]]
+name = "rustversion"
+version = "1.0.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd"
+
+[[package]]
+name = "serde"
+version = "1.0.117"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a"
+dependencies = [
+ "serde_derive",
+]
+
+[[package]]
+name = "serde_derive"
+version = "1.0.117"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "signal-hook-registry"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab"
+dependencies = [
+ "libc",
+]
+
+[[package]]
+name = "slab"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
+
+[[package]]
+name = "socket2"
+version = "0.3.16"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7fd8b795c389288baa5f355489c65e71fd48a02104600d15c4cfbc561e9e429d"
+dependencies = [
+ "cfg-if 0.1.10",
+ "libc",
+ "redox_syscall",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "sodiumoxide"
+version = "0.2.4"
+source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8"
+dependencies = [
+ "libc",
+ "libsodium-sys",
+ "serde",
+]
+
+[[package]]
+name = "structopt"
+version = "0.3.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "126d630294ec449fae0b16f964e35bf3c74f940da9dca17ee9b905f7b3112eb8"
+dependencies = [
+ "clap",
+ "lazy_static",
+ "structopt-derive",
+]
+
+[[package]]
+name = "structopt-derive"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65e51c492f9e23a220534971ff5afc14037289de430e3c83f9daf6a1b6ae91e8"
+dependencies = [
+ "heck",
+ "proc-macro-error",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "syn"
+version = "1.0.48"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-xid",
+]
+
+[[package]]
+name = "synstructure"
+version = "0.12.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "unicode-xid",
+]
+
+[[package]]
+name = "take_mut"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
+
+[[package]]
+name = "tar"
+version = "0.4.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290"
+dependencies = [
+ "filetime",
+ "libc",
+ "redox_syscall",
+ "xattr",
+]
+
+[[package]]
+name = "termcolor"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf11676eb135389f21fcda654382c4859bbfc1d2f36e4425a2f829bb41b1e20e"
+dependencies = [
+ "winapi-util",
+]
+
+[[package]]
+name = "textwrap"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
+dependencies = [
+ "unicode-width",
+]
+
+[[package]]
+name = "thiserror"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0e9ae34b84616eedaaf1e9dd6026dbe00dcafa92aa0c8077cb69df1fcfe5e53e"
+dependencies = [
+ "thiserror-impl",
+]
+
+[[package]]
+name = "thiserror-impl"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9ba20f23e85b10754cd195504aebf6a27e2e6cbe28c17778a0c930724628dd56"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "thread_local"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
+dependencies = [
+ "lazy_static",
+]
+
+[[package]]
+name = "tokio"
+version = "0.2.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff"
+dependencies = [
+ "bytes 0.5.6",
+ "fnv",
+ "futures-core",
+ "iovec",
+ "lazy_static",
+ "libc",
+ "memchr",
+ "mio",
+ "mio-named-pipes",
+ "mio-uds",
+ "num_cpus",
+ "pin-project-lite",
+ "signal-hook-registry",
+ "slab",
+ "tokio-macros",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "tokio-macros"
+version = "0.2.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "unicode-segmentation"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "db8716a166f290ff49dabc18b44aa407cb7c6dbe1aa0971b44b8a24b0ca35aae"
+
+[[package]]
+name = "unicode-width"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
+
+[[package]]
+name = "unicode-xid"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
+
+[[package]]
+name = "vcpkg"
+version = "0.2.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c"
+
+[[package]]
+name = "vec-arena"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d"
+
+[[package]]
+name = "version_check"
+version = "0.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
+
+[[package]]
+name = "waker-fn"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
+
+[[package]]
+name = "wasm-bindgen"
+version = "0.2.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
+dependencies = [
+ "cfg-if 0.1.10",
+ "wasm-bindgen-macro",
+]
+
+[[package]]
+name = "wasm-bindgen-backend"
+version = "0.2.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68"
+dependencies = [
+ "bumpalo",
+ "lazy_static",
+ "log",
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-futures"
+version = "0.4.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da"
+dependencies = [
+ "cfg-if 0.1.10",
+ "js-sys",
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
+name = "wasm-bindgen-macro"
+version = "0.2.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038"
+dependencies = [
+ "quote",
+ "wasm-bindgen-macro-support",
+]
+
+[[package]]
+name = "wasm-bindgen-macro-support"
+version = "0.2.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+ "wasm-bindgen-backend",
+ "wasm-bindgen-shared",
+]
+
+[[package]]
+name = "wasm-bindgen-shared"
+version = "0.2.68"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307"
+
+[[package]]
+name = "web-sys"
+version = "0.3.45"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d"
+dependencies = [
+ "js-sys",
+ "wasm-bindgen",
+]
+
+[[package]]
+name = "wepoll-sys"
+version = "3.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff"
+dependencies = [
+ "cc",
+]
+
+[[package]]
+name = "winapi"
+version = "0.2.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
+
+[[package]]
+name = "winapi"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-build"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
+dependencies = [
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+
+[[package]]
+name = "ws2_32-sys"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
+dependencies = [
+ "winapi 0.2.8",
+ "winapi-build",
+]
+
+[[package]]
+name = "xattr"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c"
+dependencies = [
+ "libc",
+]
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..9c0eadb
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "netapp"
+version = "0.1.0"
+authors = ["Alex Auvolat <alex.auvolat@inria.fr>"]
+edition = "2018"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+async-std = { version = "1.5.0", features=["unstable","attributes"] }
+tokio = { version = "0.2", features = ["full"] }
+kuska-handshake = { path = "../../handshake", features = ["default", "tokio_compat"] }
+hex = "0.4.2"
+log = "0.4.8"
+pretty_env_logger = "0.4"
+sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
+env_logger = "0.7.1"
+base64 = "0.12.1"
+rmp-serde = "0.14.3"
+serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+arc-swap = "1.0"
+structopt = { version = "0.3", default-features = false }
+async-trait = "0.1.7"
+err-derive = "0.2.3"
+bytes = "0.6.0"
+lru = "0.6"
+rand = "0.5.5"
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..bf2a928
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,3 @@
+all:
+ cargo build
+ RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7
diff --git a/examples/basalt.rs b/examples/basalt.rs
new file mode 100644
index 0000000..e486e08
--- /dev/null
+++ b/examples/basalt.rs
@@ -0,0 +1,76 @@
+use std::net::SocketAddr;
+use std::time::Duration;
+
+use log::info;
+
+use structopt::StructOpt;
+
+use sodiumoxide::crypto::auth;
+use sodiumoxide::crypto::sign::ed25519;
+
+use netapp::netapp::*;
+use netapp::peering::basalt::*;
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "netapp")]
+pub struct Opt {
+ #[structopt(long = "network-key", short = "n")]
+ network_key: Option<String>,
+
+ #[structopt(long = "private-key", short = "p")]
+ private_key: Option<String>,
+
+ #[structopt(long = "bootstrap-peer", short = "b")]
+ bootstrap_peers: Vec<String>,
+
+ #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")]
+ listen_addr: String,
+}
+
+#[tokio::main]
+async fn main() {
+ pretty_env_logger::init();
+
+ let opt = Opt::from_args();
+
+ let netid = match &opt.network_key {
+ Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => auth::gen_key(),
+ };
+ info!("Network key: {}", hex::encode(&netid));
+
+ let privkey = match &opt.private_key {
+ Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => {
+ let (_pk, sk) = ed25519::gen_keypair();
+ sk
+ }
+ };
+
+ info!("Node private key: {}", hex::encode(&privkey));
+ info!("Node public key: {}", hex::encode(&privkey.public_key()));
+
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let netapp = NetApp::new(listen_addr, netid, privkey);
+
+ let mut bootstrap_peers = vec![];
+ for peer in opt.bootstrap_peers.iter() {
+ if let Some(delim) = peer.find('@') {
+ let (key, ip) = peer.split_at(delim);
+ let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap();
+ let ip = ip[1..].parse::<SocketAddr>().unwrap();
+ bootstrap_peers.push((pubkey, ip));
+ }
+ }
+
+ let basalt_params = BasaltParams{
+ view_size: 100,
+ cache_size: 1000,
+ exchange_interval: Duration::from_secs(1),
+ reset_interval: Duration::from_secs(10),
+ reset_count: 20,
+ };
+ let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
+
+ tokio::join!(netapp.listen(), peering.run(),);
+}
diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs
new file mode 100644
index 0000000..8e2ae07
--- /dev/null
+++ b/examples/fullmesh.rs
@@ -0,0 +1,68 @@
+use std::net::SocketAddr;
+
+use log::info;
+
+use structopt::StructOpt;
+
+use sodiumoxide::crypto::auth;
+use sodiumoxide::crypto::sign::ed25519;
+
+use netapp::netapp::*;
+use netapp::peering::fullmesh::*;
+
+#[derive(StructOpt, Debug)]
+#[structopt(name = "netapp")]
+pub struct Opt {
+ #[structopt(long = "network-key", short = "n")]
+ network_key: Option<String>,
+
+ #[structopt(long = "private-key", short = "p")]
+ private_key: Option<String>,
+
+ #[structopt(long = "bootstrap-peer", short = "b")]
+ bootstrap_peers: Vec<String>,
+
+ #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")]
+ listen_addr: String,
+}
+
+#[tokio::main]
+async fn main() {
+ pretty_env_logger::init();
+
+ let opt = Opt::from_args();
+
+ let netid = match &opt.network_key {
+ Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => auth::gen_key(),
+ };
+ info!("Network key: {}", hex::encode(&netid));
+
+ let privkey = match &opt.private_key {
+ Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(),
+ None => {
+ let (_pk, sk) = ed25519::gen_keypair();
+ sk
+ }
+ };
+
+ info!("Node private key: {}", hex::encode(&privkey));
+ info!("Node public key: {}", hex::encode(&privkey.public_key()));
+
+ let listen_addr = opt.listen_addr.parse().unwrap();
+ let netapp = NetApp::new(listen_addr, netid, privkey);
+
+ let mut bootstrap_peers = vec![];
+ for peer in opt.bootstrap_peers.iter() {
+ if let Some(delim) = peer.find('@') {
+ let (key, ip) = peer.split_at(delim);
+ let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap();
+ let ip = ip[1..].parse::<SocketAddr>().unwrap();
+ bootstrap_peers.push((pubkey, ip));
+ }
+ }
+
+ let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
+
+ tokio::join!(netapp.listen(), peering.run(),);
+}
diff --git a/rustfmt.toml b/rustfmt.toml
new file mode 100644
index 0000000..218e203
--- /dev/null
+++ b/rustfmt.toml
@@ -0,0 +1 @@
+hard_tabs = true
diff --git a/src/conn.rs b/src/conn.rs
new file mode 100644
index 0000000..9b60d2a
--- /dev/null
+++ b/src/conn.rs
@@ -0,0 +1,280 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::sync::atomic::{self, AtomicU16};
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use log::{debug, trace};
+
+use sodiumoxide::crypto::sign::ed25519;
+use tokio::io::split;
+use tokio::net::TcpStream;
+use tokio::sync::{mpsc, oneshot, watch};
+
+use kuska_handshake::async_std::{
+ handshake_client, handshake_server, BoxStream, TokioCompatExt, TokioCompatExtRead,
+ TokioCompatExtWrite,
+};
+
+use crate::error::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+use crate::util::*;
+
+pub struct ServerConn {
+ netapp: Arc<NetApp>,
+ pub remote_addr: SocketAddr,
+ pub peer_pk: ed25519::PublicKey,
+ resp_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
+ close_send: watch::Sender<bool>,
+}
+
+impl ServerConn {
+ pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> {
+ let mut asyncstd_socket = TokioCompatExt::wrap(socket);
+ let handshake = handshake_server(
+ &mut asyncstd_socket,
+ netapp.netid.clone(),
+ netapp.pubkey.clone(),
+ netapp.privkey.clone(),
+ )
+ .await?;
+ let peer_pk = handshake.peer_pk.clone();
+
+ let tokio_socket = asyncstd_socket.into_inner();
+ let remote_addr = tokio_socket.peer_addr().unwrap();
+
+ debug!(
+ "Handshake complete (server) with {}@{}",
+ hex::encode(&peer_pk),
+ remote_addr
+ );
+
+ let (read, write) = split(tokio_socket);
+
+ let read = TokioCompatExtRead::wrap(read);
+ let write = TokioCompatExtWrite::wrap(write);
+
+ let (box_stream_read, box_stream_write) =
+ BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
+
+ let (resp_send, resp_recv) = mpsc::unbounded_channel();
+
+ let (close_send, close_recv) = watch::channel(false);
+
+ let conn = Arc::new(ServerConn {
+ netapp: netapp.clone(),
+ remote_addr,
+ peer_pk: peer_pk.clone(),
+ resp_send,
+ close_send,
+ });
+
+ netapp.connected_as_server(peer_pk.clone(), conn.clone());
+
+ let conn2 = conn.clone();
+ let conn3 = conn.clone();
+ tokio::try_join!(
+ conn2.recv_loop(box_stream_read, close_recv.clone()),
+ conn3.send_loop(resp_recv, box_stream_write, close_recv.clone()),
+ )
+ .map(|_| ())
+ .log_err("ServerConn recv_loop/send_loop");
+
+ netapp.disconnected_as_server(&peer_pk, conn);
+
+ Ok(())
+ }
+
+ pub fn close(&self) {
+ self.close_send.broadcast(true).unwrap();
+ }
+}
+
+impl SendLoop for ServerConn {}
+
+#[async_trait]
+impl RecvLoop for ServerConn {
+ async fn recv_handler(self: Arc<Self>, id: u16, bytes: Vec<u8>) {
+ let bytes: Bytes = bytes.into();
+
+ let prio = bytes[0];
+
+ let mut kind_bytes = [0u8; 4];
+ kind_bytes.copy_from_slice(&bytes[1..5]);
+ let kind = u32::from_be_bytes(kind_bytes);
+
+ if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) {
+ let resp = handler(self.peer_pk.clone(), bytes.slice(5..)).await;
+ self.resp_send
+ .send((id, prio, resp))
+ .log_err("ServerConn recv_handler send resp");
+ }
+ }
+}
+pub struct ClientConn {
+ pub netapp: Arc<NetApp>,
+ pub remote_addr: SocketAddr,
+ pub peer_pk: ed25519::PublicKey,
+ query_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
+ next_query_number: AtomicU16,
+ resp_send: mpsc::UnboundedSender<(RequestID, Vec<u8>)>,
+ resp_notify_send: mpsc::UnboundedSender<(RequestID, oneshot::Sender<Vec<u8>>)>,
+ close_send: watch::Sender<bool>,
+}
+
+impl ClientConn {
+ pub(crate) async fn init(
+ netapp: Arc<NetApp>,
+ socket: TcpStream,
+ remote_pk: ed25519::PublicKey,
+ ) -> Result<(), Error> {
+ let mut asyncstd_socket = TokioCompatExt::wrap(socket);
+
+ let handshake = handshake_client(
+ &mut asyncstd_socket,
+ netapp.netid.clone(),
+ netapp.pubkey.clone(),
+ netapp.privkey.clone(),
+ remote_pk.clone(),
+ )
+ .await?;
+
+ let tokio_socket = asyncstd_socket.into_inner();
+ let remote_addr = tokio_socket.peer_addr().unwrap();
+
+ debug!(
+ "Handshake complete (client) with {}@{}",
+ hex::encode(&remote_pk),
+ remote_addr
+ );
+
+ let (read, write) = split(tokio_socket);
+
+ let read = TokioCompatExtRead::wrap(read);
+ let write = TokioCompatExtWrite::wrap(write);
+
+ let (box_stream_read, box_stream_write) =
+ BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
+
+ let (query_send, query_recv) = mpsc::unbounded_channel();
+ let (resp_send, resp_recv) = mpsc::unbounded_channel();
+ let (resp_notify_send, resp_notify_recv) = mpsc::unbounded_channel();
+
+ let (close_send, close_recv) = watch::channel(false);
+
+ let conn = Arc::new(ClientConn {
+ netapp: netapp.clone(),
+ remote_addr,
+ peer_pk: remote_pk.clone(),
+ next_query_number: AtomicU16::from(0u16),
+ query_send,
+ resp_send,
+ resp_notify_send,
+ close_send,
+ });
+
+ netapp.connected_as_client(remote_pk.clone(), conn.clone());
+
+ tokio::spawn(async move {
+ let conn2 = conn.clone();
+ let conn3 = conn.clone();
+ let conn4 = conn.clone();
+ tokio::try_join!(
+ conn2.send_loop(query_recv, box_stream_write, close_recv.clone()),
+ conn3.recv_loop(box_stream_read, close_recv.clone()),
+ conn4.dispatch_resp(resp_recv, resp_notify_recv, close_recv.clone()),
+ )
+ .map(|_| ())
+ .log_err("ClientConn send_loop/recv_loop/dispatch_loop");
+
+ netapp.disconnected_as_client(&remote_pk, conn);
+ });
+
+ Ok(())
+ }
+
+ pub fn close(&self) {
+ self.close_send.broadcast(true).unwrap();
+ }
+
+ async fn dispatch_resp(
+ self: Arc<Self>,
+ mut resp_recv: mpsc::UnboundedReceiver<(RequestID, Vec<u8>)>,
+ mut resp_notify_recv: mpsc::UnboundedReceiver<(RequestID, oneshot::Sender<Vec<u8>>)>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut resps: HashMap<RequestID, Vec<u8>> = HashMap::new();
+ let mut resp_notify: HashMap<RequestID, oneshot::Sender<Vec<u8>>> = HashMap::new();
+ while !*must_exit.borrow() {
+ tokio::select! {
+ resp = resp_recv.recv() => {
+ if let Some((id, resp)) = resp {
+ trace!("dispatch_resp: got resp to {}, {} bytes", id, resp.len());
+ if let Some(ch) = resp_notify.remove(&id) {
+ ch.send(resp).map_err(|_| Error::Message("Could not dispatch reply".to_string()))?;
+ } else {
+ resps.insert(id, resp);
+ }
+ }
+ }
+ resp_ch = resp_notify_recv.recv() => {
+ if let Some((id, resp_ch)) = resp_ch {
+ trace!("dispatch_resp: got resp_ch {}", id);
+ if let Some(rs) = resps.remove(&id) {
+ resp_ch.send(rs).map_err(|_| Error::Message("Could not dispatch reply".to_string()))?;
+ } else {
+ resp_notify.insert(id, resp_ch);
+ }
+ }
+ }
+ exit = must_exit.recv() => {
+ if exit == Some(true) {
+ break;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub async fn request<T>(
+ self: Arc<Self>,
+ rq: T,
+ prio: RequestPriority,
+ ) -> Result<<T as Message>::Response, Error>
+ where
+ T: Message,
+ {
+ let id = self
+ .next_query_number
+ .fetch_add(1u16, atomic::Ordering::Relaxed);
+ let mut bytes = vec![prio];
+ bytes.extend_from_slice(&u32::to_be_bytes(T::KIND)[..]);
+ bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]);
+
+ let (resp_send, resp_recv) = oneshot::channel();
+ self.resp_notify_send.send((id, resp_send))?;
+
+ trace!("request: query_send {}, {} bytes", id, bytes.len());
+ self.query_send.send((id, prio, bytes))?;
+
+ let resp = resp_recv.await?;
+
+ rmp_serde::decode::from_read_ref::<_, Result<<T as Message>::Response, String>>(&resp[..])?
+ .map_err(Error::Remote)
+ }
+}
+
+impl SendLoop for ClientConn {}
+
+#[async_trait]
+impl RecvLoop for ClientConn {
+ async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>) {
+ self.resp_send
+ .send((id, msg))
+ .log_err("ClientConn::recv_handler");
+ }
+}
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 0000000..b54423a
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,57 @@
+use err_derive::Error;
+use std::io;
+
+use log::error;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error(display = "IO error: {}", _0)]
+ Io(#[error(source)] io::Error),
+
+ #[error(display = "Messagepack encode error: {}", _0)]
+ RMPEncode(#[error(source)] rmp_serde::encode::Error),
+ #[error(display = "Messagepack decode error: {}", _0)]
+ RMPDecode(#[error(source)] rmp_serde::decode::Error),
+
+ #[error(display = "Tokio join error: {}", _0)]
+ TokioJoin(#[error(source)] tokio::task::JoinError),
+
+ #[error(display = "oneshot receive error: {}", _0)]
+ OneshotRecv(#[error(source)] tokio::sync::oneshot::error::RecvError),
+
+ #[error(display = "Handshake error: {}", _0)]
+ Handshake(#[error(source)] kuska_handshake::async_std::Error),
+
+ #[error(display = "{}", _0)]
+ Message(String),
+
+ #[error(display = "Remote error: {}", _0)]
+ Remote(String),
+}
+
+impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
+ fn from(_e: tokio::sync::watch::error::SendError<T>) -> Error {
+ Error::Message(format!("Watch send error"))
+ }
+}
+
+impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
+ fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
+ Error::Message(format!("MPSC send error"))
+ }
+}
+
+pub trait LogError {
+ fn log_err(self, msg: &'static str);
+}
+
+impl<E> LogError for Result<(), E>
+where
+ E: Into<Error>,
+{
+ fn log_err(self, msg: &'static str) {
+ if let Err(e) = self {
+ error!("Error: {}: {}", msg, Into::<Error>::into(e));
+ };
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..2db8a22
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,9 @@
+#![feature(map_first_last)]
+
+pub mod conn;
+pub mod error;
+pub mod message;
+pub mod netapp;
+pub mod peering;
+pub mod proto;
+pub mod util;
diff --git a/src/message.rs b/src/message.rs
new file mode 100644
index 0000000..bcc5aac
--- /dev/null
+++ b/src/message.rs
@@ -0,0 +1,18 @@
+use serde::{Deserialize, Serialize};
+
+pub type MessageKind = u32;
+
+pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
+ const KIND: MessageKind;
+ type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) struct HelloMessage {
+ pub server_port: u16,
+}
+
+impl Message for HelloMessage {
+ const KIND: MessageKind = 0x42000001;
+ type Response = ();
+}
diff --git a/src/netapp.rs b/src/netapp.rs
new file mode 100644
index 0000000..6f174b4
--- /dev/null
+++ b/src/netapp.rs
@@ -0,0 +1,214 @@
+use std::collections::HashMap;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::sync::{Arc, RwLock};
+
+use std::future::Future;
+
+use log::{debug, info};
+
+use arc_swap::{ArcSwap, ArcSwapOption};
+use bytes::Bytes;
+
+use sodiumoxide::crypto::auth;
+use sodiumoxide::crypto::sign::ed25519;
+use tokio::net::{TcpListener, TcpStream};
+
+use crate::conn::*;
+use crate::error::*;
+use crate::message::*;
+use crate::proto::*;
+use crate::util::*;
+
+pub struct NetApp {
+ pub listen_addr: SocketAddr,
+ pub netid: auth::Key,
+ pub pubkey: ed25519::PublicKey,
+ pub privkey: ed25519::SecretKey,
+ pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
+ pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
+ pub(crate) msg_handlers: ArcSwap<
+ HashMap<
+ MessageKind,
+ Arc<
+ dyn Fn(
+ ed25519::PublicKey,
+ Bytes,
+ ) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>>
+ + Sync
+ + Send,
+ >,
+ >,
+ >,
+ pub(crate) on_connected:
+ ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
+ pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
+}
+
+async fn handler_aux<M, F, R>(handler: Arc<F>, remote: ed25519::PublicKey, bytes: Bytes) -> Vec<u8>
+where
+ M: Message + 'static,
+ F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync,
+{
+ debug!(
+ "Handling message of kind {:08x} from {}",
+ M::KIND,
+ hex::encode(remote)
+ );
+ let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
+ Ok(msg) => handler(remote.clone(), msg).await,
+ Err(e) => Err(e.into()),
+ };
+ let res = res.map_err(|e| format!("{}", e));
+ rmp_to_vec_all_named(&res).unwrap_or(vec![])
+}
+
+impl NetApp {
+ pub fn new(
+ listen_addr: SocketAddr,
+ netid: auth::Key,
+ privkey: ed25519::SecretKey,
+ ) -> Arc<Self> {
+ let pubkey = privkey.public_key();
+ let netapp = Arc::new(Self {
+ listen_addr,
+ netid,
+ pubkey,
+ privkey,
+ server_conns: RwLock::new(HashMap::new()),
+ client_conns: RwLock::new(HashMap::new()),
+ msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
+ on_connected: ArcSwapOption::new(None),
+ on_disconnected: ArcSwapOption::new(None),
+ });
+
+ let netapp2 = netapp.clone();
+ netapp.add_msg_handler::<HelloMessage, _, _>(
+ move |from: ed25519::PublicKey, msg: HelloMessage| {
+ netapp2.handle_hello_message(from, msg);
+ async { Ok(()) }
+ },
+ );
+
+ netapp
+ }
+
+ pub fn add_msg_handler<M, F, R>(&self, handler: F)
+ where
+ M: Message + 'static,
+ F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
+ R: Future<Output = Result<<M as Message>::Response, Error>> + Send + Sync + 'static,
+ {
+ let handler = Arc::new(handler);
+ let fun = Arc::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
+ let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
+ Box::pin(handler_aux(handler.clone(), remote, bytes));
+ fun
+ });
+ let mut handlers = self.msg_handlers.load().as_ref().clone();
+ handlers.insert(M::KIND, fun);
+ self.msg_handlers.store(Arc::new(handlers));
+ }
+
+ pub async fn listen(self: Arc<Self>) {
+ let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
+ info!("Listening on {}", self.listen_addr);
+
+ loop {
+ // The second item contains the IP and port of the new connection.
+ let (socket, _) = listener.accept().await.unwrap();
+ info!(
+ "Incoming connection from {}, negotiating handshake...",
+ socket.peer_addr().unwrap()
+ );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ ServerConn::run(self2, socket)
+ .await
+ .log_err("ServerConn::run");
+ });
+ }
+ }
+
+ pub async fn try_connect(
+ self: Arc<Self>,
+ ip: SocketAddr,
+ pk: ed25519::PublicKey,
+ ) -> Result<(), Error> {
+ if self.client_conns.read().unwrap().contains_key(&pk) {
+ return Ok(());
+ }
+ let socket = TcpStream::connect(ip).await?;
+ info!("Connected to {}, negotiating handshake...", ip);
+ ClientConn::init(self, socket, pk.clone()).await?;
+ Ok(())
+ }
+
+ pub fn disconnect(self: Arc<Self>, id: &ed25519::PublicKey) {
+ let conn = self.client_conns.read().unwrap().get(id).cloned();
+ if let Some(c) = conn {
+ c.close();
+ }
+ }
+
+ pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
+ let mut conn_list = self.server_conns.write().unwrap();
+ conn_list.insert(id.clone(), conn);
+ }
+
+ fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
+ if let Some(h) = self.on_connected.load().as_ref() {
+ if let Some(c) = self.server_conns.read().unwrap().get(&id) {
+ let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port);
+ h(id, remote_addr, true);
+ }
+ }
+ }
+
+ pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
+ let mut conn_list = self.server_conns.write().unwrap();
+ if let Some(c) = conn_list.get(id) {
+ if Arc::ptr_eq(c, &conn) {
+ conn_list.remove(id);
+ }
+
+ if let Some(h) = self.on_disconnected.load().as_ref() {
+ h(conn.peer_pk, true);
+ }
+ }
+ }
+
+ pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
+ {
+ let mut conn_list = self.client_conns.write().unwrap();
+ if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) {
+ tokio::spawn(async move { old_c.close() });
+ }
+ }
+
+ if let Some(h) = self.on_connected.load().as_ref() {
+ h(conn.peer_pk, conn.remote_addr, false);
+ }
+
+ tokio::spawn(async move {
+ let server_port = conn.netapp.listen_addr.port();
+ conn.request(HelloMessage { server_port }, prio::NORMAL)
+ .await
+ .log_err("Sending hello message");
+ });
+ }
+
+ pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
+ let mut conn_list = self.client_conns.write().unwrap();
+ if let Some(c) = conn_list.get(id) {
+ if Arc::ptr_eq(c, &conn) {
+ conn_list.remove(id);
+ }
+
+ if let Some(h) = self.on_disconnected.load().as_ref() {
+ h(conn.peer_pk, false);
+ }
+ }
+ }
+}
diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs
new file mode 100644
index 0000000..be807a8
--- /dev/null
+++ b/src/peering/basalt.rs
@@ -0,0 +1,475 @@
+use std::collections::HashSet;
+use std::net::SocketAddr;
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
+
+use log::{debug, warn};
+use lru::LruCache;
+use rand::{thread_rng, Rng};
+use serde::{Deserialize, Serialize};
+
+use sodiumoxide::crypto::hash;
+use sodiumoxide::crypto::sign::ed25519;
+
+use crate::conn::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+
+// -- Protocol messages --
+
+#[derive(Serialize, Deserialize)]
+struct PullMessage {}
+
+impl Message for PullMessage {
+ const KIND: MessageKind = 0x42001100;
+ type Response = PushMessage;
+}
+
+#[derive(Serialize, Deserialize)]
+struct PushMessage {
+ peers: Vec<Peer>,
+}
+
+impl Message for PushMessage {
+ const KIND: MessageKind = 0x42001101;
+ type Response = ();
+}
+
+// -- Algorithm data structures --
+
+type Seed = [u8; 32];
+
+#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
+struct Peer {
+ id: ed25519::PublicKey,
+ addr: SocketAddr,
+}
+
+type Cost = [u8; 40];
+const MAX_COST: Cost = [0xffu8; 40];
+
+impl Peer {
+ fn cost(&self, seed: &Seed) -> Cost {
+ let mut hasher = hash::State::new();
+ hasher.update(&seed[..]);
+
+ let mut cost = [0u8; 40];
+ match self.addr {
+ SocketAddr::V4(v4addr) => {
+ let v4ip = v4addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v4ip[..i + 1]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ SocketAddr::V6(v6addr) => {
+ let v6ip = v6addr.ip().octets();
+
+ for i in 0..4 {
+ let mut h = hasher.clone();
+ h.update(&v6ip[..i + 2]);
+ cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
+ }
+ }
+ }
+
+ {
+ let mut h5 = hasher.clone();
+ h5.update(&format!("{}", self.addr).into_bytes()[..]);
+ cost[32..40].copy_from_slice(&h5.finalize()[..8]);
+ }
+
+ cost
+ }
+}
+
+struct BasaltSlot {
+ seed: Seed,
+ peer: Option<Peer>,
+}
+
+impl BasaltSlot {
+ fn cost(&self) -> Cost {
+ self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST)
+ }
+}
+
+struct BasaltView {
+ i_reset: usize,
+ slots: Vec<BasaltSlot>,
+}
+
+impl BasaltView {
+ fn new(size: usize) -> Self {
+ let slots = (0..size)
+ .map(|_| BasaltSlot {
+ seed: rand_seed(),
+ peer: None,
+ })
+ .collect::<Vec<_>>();
+ Self { i_reset: 0, slots }
+ }
+
+ fn current_peers(&self) -> HashSet<Peer> {
+ self.slots
+ .iter()
+ .filter(|s| s.peer.is_some())
+ .map(|s| s.peer.unwrap().clone())
+ .collect::<HashSet<_>>()
+ }
+ fn current_peers_vec(&self) -> Vec<Peer> {
+ self.current_peers().drain().collect::<Vec<_>>()
+ }
+
+ fn sample(&self, count: usize) -> Vec<Peer> {
+ let possibles = self
+ .slots
+ .iter()
+ .enumerate()
+ .filter(|(_i, s)| s.peer.is_some())
+ .map(|(i, _s)| i)
+ .collect::<Vec<_>>();
+ if possibles.len() == 0 {
+ vec![]
+ } else {
+ let mut ret = vec![];
+ let mut rng = thread_rng();
+ for _i in 0..count {
+ let idx = rng.gen_range(0, possibles.len());
+ ret.push(self.slots[possibles[idx]].peer.unwrap());
+ }
+ ret
+ }
+ }
+
+ fn update_slot(&mut self, i: usize, peers: &[Peer]) {
+ let mut slot_cost = self.slots[i].cost();
+
+ for peer in peers.iter() {
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if self.slots[i].peer.is_none() || peer_cost < slot_cost {
+ self.slots[i].peer = Some(*peer);
+ slot_cost = peer_cost;
+ }
+ }
+ }
+ fn update_all_slots(&mut self, peers: &[Peer]) {
+ for i in 0..self.slots.len() {
+ self.update_slot(i, peers);
+ }
+ }
+
+ fn disconnected(&mut self, id: ed25519::PublicKey) {
+ let mut cleared_slots = vec![];
+ for i in 0..self.slots.len() {
+ if let Some(p) = self.slots[i].peer {
+ if p.id == id {
+ self.slots[i].peer = None;
+ cleared_slots.push(i);
+ }
+ }
+ }
+
+ let remaining_peers = self.current_peers_vec();
+
+ for i in cleared_slots {
+ self.update_slot(i, &remaining_peers[..]);
+ }
+ }
+
+ fn should_try_list(&self, peers: &[Peer]) -> Vec<Peer> {
+ // Select peers that have lower cost than any of our slots
+ let mut ret = HashSet::new();
+
+ for i in 0..self.slots.len() {
+ if self.slots[i].peer.is_none() {
+ return peers.to_vec();
+ }
+ let mut min_cost = self.slots[i].cost();
+ let mut min_peer = None;
+ for peer in peers.iter() {
+ if ret.contains(peer) {
+ continue;
+ }
+ let peer_cost = peer.cost(&self.slots[i].seed);
+ if peer_cost < min_cost {
+ min_cost = peer_cost;
+ min_peer = Some(*peer);
+ }
+ }
+ if let Some(p) = min_peer {
+ ret.insert(p);
+ if ret.len() == peers.len() {
+ break;
+ }
+ }
+ }
+
+ ret.drain().collect::<Vec<_>>()
+ }
+
+ fn reset_some_slots(&mut self, count: usize) {
+ for _i in 0..count {
+ self.slots[self.i_reset].seed = rand_seed();
+ self.i_reset = (self.i_reset + 1) % self.slots.len();
+ }
+ }
+}
+
+pub struct BasaltParams {
+ pub view_size: usize,
+ pub cache_size: usize,
+ pub exchange_interval: Duration,
+ pub reset_interval: Duration,
+ pub reset_count: usize,
+}
+
+pub struct Basalt {
+ netapp: Arc<NetApp>,
+
+ param: BasaltParams,
+ bootstrap_peers: Vec<Peer>,
+
+ view: RwLock<BasaltView>,
+ current_attempts: RwLock<HashSet<Peer>>,
+ backlog: RwLock<LruCache<Peer, ()>>,
+}
+
+impl Basalt {
+ pub fn new(
+ netapp: Arc<NetApp>,
+ bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
+ param: BasaltParams,
+ ) -> Arc<Self> {
+ let bootstrap_peers = bootstrap_list
+ .iter()
+ .map(|(id, addr)| Peer {
+ id: *id,
+ addr: *addr,
+ })
+ .collect::<Vec<_>>();
+
+ let view = BasaltView::new(param.view_size);
+ let backlog = LruCache::new(param.cache_size);
+
+ let basalt = Arc::new(Self {
+ netapp: netapp.clone(),
+ param,
+ bootstrap_peers,
+ view: RwLock::new(view),
+ current_attempts: RwLock::new(HashSet::new()),
+ backlog: RwLock::new(backlog),
+ });
+
+ let basalt2 = basalt.clone();
+ netapp.on_connected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
+ basalt2.on_connected(pk, addr, is_incoming);
+ },
+ ))));
+
+ let basalt2 = basalt.clone();
+ netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, is_incoming: bool| {
+ basalt2.on_disconnected(pk, is_incoming);
+ },
+ ))));
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PullMessage, _, _>(
+ move |_from: ed25519::PublicKey, _pullmsg: PullMessage| {
+ let push_msg = basalt2.make_push_message();
+ async move { Ok(push_msg) }
+ },
+ );
+
+ let basalt2 = basalt.clone();
+ netapp.add_msg_handler::<PushMessage, _, _>(
+ move |_from: ed25519::PublicKey, push_msg: PushMessage| {
+ basalt2.handle_peer_list(&push_msg.peers[..]);
+ async move { Ok(()) }
+ },
+ );
+
+ basalt
+ }
+
+ pub fn sample(&self, count: usize) -> Vec<ed25519::PublicKey> {
+ self.view
+ .read()
+ .unwrap()
+ .sample(count)
+ .iter()
+ .map(|p| p.id)
+ .collect::<Vec<_>>()
+ }
+
+ pub async fn run(self: Arc<Self>) {
+ for peer in self.bootstrap_peers.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+
+ let pushpull_loop = self.clone().run_pushpull_loop();
+ let reset_loop = self.run_reset_loop();
+ tokio::join!(pushpull_loop, reset_loop);
+ }
+
+ async fn run_pushpull_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.exchange_interval).await;
+
+ let peers = self.view.read().unwrap().sample(2);
+ if peers.len() == 2 {
+ let (c1, c2) = {
+ let client_conns = self.netapp.client_conns.read().unwrap();
+ (
+ client_conns.get(&peers[0].id).cloned(),
+ client_conns.get(&peers[1].id).cloned(),
+ )
+ };
+ if let Some(c) = c1 {
+ tokio::spawn(self.clone().do_pull(c));
+ }
+ if let Some(c) = c2 {
+ tokio::spawn(self.clone().do_push(c));
+ }
+ }
+ }
+ }
+
+ async fn do_pull(self: Arc<Self>, peer: Arc<ClientConn>) {
+ match peer.request(PullMessage {}, prio::NORMAL).await {
+ Ok(resp) => {
+ self.handle_peer_list(&resp.peers[..]);
+ }
+ Err(e) => {
+ warn!("Error during pull exchange: {}", e);
+ }
+ };
+ }
+
+ async fn do_push(self: Arc<Self>, peer: Arc<ClientConn>) {
+ let push_msg = self.make_push_message();
+ if let Err(e) = peer.request(push_msg, prio::NORMAL).await {
+ warn!("Error during push exchange: {}", e);
+ }
+ }
+
+ fn make_push_message(&self) -> PushMessage {
+ let current_peers = self.view.read().unwrap().current_peers_vec();
+ PushMessage {
+ peers: current_peers,
+ }
+ }
+
+ async fn run_reset_loop(self: Arc<Self>) {
+ loop {
+ tokio::time::delay_for(self.param.reset_interval).await;
+
+ {
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+ let prev_peers_vec = prev_peers.iter().cloned().collect::<Vec<_>>();
+
+ view.reset_some_slots(self.param.reset_count);
+ view.update_all_slots(&prev_peers_vec[..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+
+ let mut to_retry_maybe = self.bootstrap_peers.clone();
+ for (peer, _) in self.backlog.read().unwrap().iter() {
+ if !self.bootstrap_peers.contains(peer) {
+ to_retry_maybe.push(*peer);
+ }
+ }
+ self.handle_peer_list(&to_retry_maybe[..]);
+ }
+ }
+
+ fn handle_peer_list(self: &Arc<Self>, peers: &[Peer]) {
+ let to_connect = self.view.read().unwrap().should_try_list(peers);
+
+ for peer in to_connect.iter() {
+ tokio::spawn(self.clone().try_connect(*peer));
+ }
+ }
+
+ async fn try_connect(self: Arc<Self>, peer: Peer) {
+ {
+ let view = self.view.read().unwrap();
+ let mut attempts = self.current_attempts.write().unwrap();
+
+ if view.slots.iter().any(|x| x.peer == Some(peer)) {
+ return;
+ }
+ if attempts.contains(&peer) {
+ return;
+ }
+
+ attempts.insert(peer);
+ }
+ let res = self.netapp.clone().try_connect(peer.addr, peer.id).await;
+ debug!("Connection attempt to {}: {:?}", peer.addr, res);
+
+ self.current_attempts.write().unwrap().remove(&peer);
+
+ if res.is_err() {
+ self.backlog.write().unwrap().pop(&peer);
+ }
+ }
+
+ fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) {
+ if is_incoming {
+ self.handle_peer_list(&[Peer{id: pk, addr}][..]);
+ } else {
+ let peer = Peer { id: pk, addr };
+
+ let mut backlog = self.backlog.write().unwrap();
+ if backlog.get(&peer).is_none() {
+ backlog.put(peer, ());
+ }
+ drop(backlog);
+
+ let mut view = self.view.write().unwrap();
+ let prev_peers = view.current_peers();
+
+ view.update_all_slots(&[peer][..]);
+
+ let new_peers = view.current_peers();
+ drop(view);
+
+ self.close_all_diff(&prev_peers, &new_peers);
+ }
+ }
+
+ fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) {
+ if !is_incoming {
+ self.view.write().unwrap().disconnected(pk);
+ }
+ }
+
+ fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
+ let client_conns = self.netapp.client_conns.read().unwrap();
+ for peer in prev_peers.iter() {
+ if !new_peers.contains(peer) {
+ if let Some(c) = client_conns.get(&peer.id) {
+ debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr);
+ c.close();
+ }
+ }
+ }
+ }
+}
+
+fn rand_seed() -> Seed {
+ let mut seed = [0u8; 32];
+ sodiumoxide::randombytes::randombytes_into(&mut seed[..]);
+ seed
+}
diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs
new file mode 100644
index 0000000..e04beb6
--- /dev/null
+++ b/src/peering/fullmesh.rs
@@ -0,0 +1,437 @@
+use std::collections::{HashMap, VecDeque};
+use std::net::SocketAddr;
+use std::sync::atomic::{self, AtomicU64};
+use std::sync::{Arc, RwLock};
+use std::time::{Duration, Instant};
+
+use log::{debug, info, trace, warn};
+use serde::{Deserialize, Serialize};
+
+use sodiumoxide::crypto::hash;
+use sodiumoxide::crypto::sign::ed25519;
+
+use crate::conn::*;
+use crate::message::*;
+use crate::netapp::*;
+use crate::proto::*;
+
+const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
+const CONN_MAX_RETRIES: usize = 10;
+const PING_INTERVAL: Duration = Duration::from_secs(10);
+const LOOP_DELAY: Duration = Duration::from_secs(1);
+
+// -- Protocol messages --
+
+#[derive(Serialize, Deserialize)]
+struct PingMessage {
+ pub id: u64,
+ pub peer_list_hash: hash::Digest,
+}
+
+impl Message for PingMessage {
+ const KIND: MessageKind = 0x42001000;
+ type Response = PingMessage;
+}
+
+#[derive(Serialize, Deserialize)]
+struct PeerListMessage {
+ pub list: Vec<(ed25519::PublicKey, SocketAddr)>,
+}
+
+impl Message for PeerListMessage {
+ const KIND: MessageKind = 0x42001001;
+ type Response = PeerListMessage;
+}
+
+// -- Algorithm data structures --
+
+#[derive(Debug)]
+struct PeerInfo {
+ addr: SocketAddr,
+ state: PeerConnState,
+ last_seen: Option<Instant>,
+ ping: VecDeque<Duration>,
+}
+
+#[derive(Copy, Clone, Debug)]
+pub struct PeerInfoPub {
+ pub id: ed25519::PublicKey,
+ pub addr: SocketAddr,
+ pub state: PeerConnState,
+ pub last_seen: Option<Instant>,
+ pub avg_ping: Option<Duration>,
+ pub max_ping: Option<Duration>,
+ pub med_ping: Option<Duration>,
+}
+
+// PeerConnState: possible states for our tentative connections to given peer
+// This module is only interested in recording connection info for outgoing
+// TCP connections
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub enum PeerConnState {
+ // This entry represents ourself
+ Ourself,
+
+ // We currently have a connection to this peer
+ Connected,
+
+ // Our next connection tentative (the nth, where n is the first value)
+ // will be at given Instant
+ Waiting(usize, Instant),
+
+ // A connection tentative is in progress
+ Trying(usize),
+
+ // We abandonned trying to connect to this peer (too many failed attempts)
+ Abandonned,
+}
+
+struct KnownHosts {
+ list: HashMap<ed25519::PublicKey, PeerInfo>,
+ hash: hash::Digest,
+}
+
+impl KnownHosts {
+ fn new() -> Self {
+ let list = HashMap::new();
+ let hash = Self::calculate_hash(&list);
+ Self { list, hash }
+ }
+ fn update_hash(&mut self) {
+ self.hash = Self::calculate_hash(&self.list);
+ }
+ fn map_into_vec(
+ input: &HashMap<ed25519::PublicKey, PeerInfo>,
+ ) -> Vec<(ed25519::PublicKey, SocketAddr)> {
+ let mut list = Vec::with_capacity(input.len());
+ for (id, peer) in input.iter() {
+ if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
+ list.push((id.clone(), peer.addr));
+ }
+ }
+ list
+ }
+ fn calculate_hash(input: &HashMap<ed25519::PublicKey, PeerInfo>) -> hash::Digest {
+ let mut list = Self::map_into_vec(input);
+ list.sort();
+ let mut hash_state = hash::State::new();
+ for (id, addr) in list {
+ hash_state.update(&id[..]);
+ hash_state.update(&format!("{}", addr).into_bytes()[..]);
+ }
+ hash_state.finalize()
+ }
+}
+
+pub struct FullMeshPeeringStrategy {
+ netapp: Arc<NetApp>,
+ known_hosts: RwLock<KnownHosts>,
+ next_ping_id: AtomicU64,
+}
+
+impl FullMeshPeeringStrategy {
+ pub fn new(
+ netapp: Arc<NetApp>,
+ bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
+ ) -> Arc<Self> {
+ let mut known_hosts = KnownHosts::new();
+ for (pk, addr) in bootstrap_list {
+ if pk != netapp.pubkey {
+ known_hosts.list.insert(
+ pk,
+ PeerInfo {
+ addr: addr,
+ state: PeerConnState::Waiting(0, Instant::now()),
+ last_seen: None,
+ ping: VecDeque::new(),
+ },
+ );
+ }
+ }
+
+ let strat = Arc::new(Self {
+ netapp: netapp.clone(),
+ known_hosts: RwLock::new(known_hosts),
+ next_ping_id: AtomicU64::new(42),
+ });
+
+ let strat2 = strat.clone();
+ netapp.add_msg_handler::<PingMessage, _, _>(
+ move |from: ed25519::PublicKey, ping: PingMessage| {
+ let ping_resp = PingMessage {
+ id: ping.id,
+ peer_list_hash: strat2.known_hosts.read().unwrap().hash,
+ };
+ async move {
+ debug!("Ping from {}", hex::encode(&from));
+ Ok(ping_resp)
+ }
+ },
+ );
+
+ let strat2 = strat.clone();
+ netapp.add_msg_handler::<PeerListMessage, _, _>(
+ move |_from: ed25519::PublicKey, peer_list: PeerListMessage| {
+ strat2.handle_peer_list(&peer_list.list[..]);
+ let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
+ let resp = PeerListMessage { list: peer_list };
+ async move { Ok(resp) }
+ },
+ );
+
+ let strat2 = strat.clone();
+ netapp.on_connected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
+ let strat2 = strat2.clone();
+ tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
+ },
+ ))));
+
+ let strat2 = strat.clone();
+ netapp.on_disconnected.store(Some(Arc::new(Box::new(
+ move |pk: ed25519::PublicKey, is_incoming: bool| {
+ let strat2 = strat2.clone();
+ tokio::spawn(strat2.on_disconnected(pk, is_incoming));
+ },
+ ))));
+
+ strat
+ }
+
+ pub async fn run(self: Arc<Self>) {
+ loop {
+ // 1. Read current state: get list of connected peers (ping them)
+ let known_hosts = self.known_hosts.read().unwrap();
+ debug!("known_hosts: {} peers", known_hosts.list.len());
+
+ let mut to_ping = vec![];
+ let mut to_retry = vec![];
+ for (id, info) in known_hosts.list.iter() {
+ debug!("{}, {:?}", hex::encode(id), info);
+ match info.state {
+ PeerConnState::Connected => {
+ let must_ping = match info.last_seen {
+ None => true,
+ Some(t) => Instant::now() - t > PING_INTERVAL,
+ };
+ if must_ping {
+ to_ping.push(id.clone());
+ }
+ }
+ PeerConnState::Waiting(_, t) => {
+ if Instant::now() >= t {
+ to_retry.push(id.clone());
+ }
+ }
+ _ => (),
+ }
+ }
+ drop(known_hosts);
+
+ // 2. Dispatch ping to hosts
+ trace!("to_ping: {} peers", to_retry.len());
+ for id in to_ping {
+ tokio::spawn(self.clone().ping(id));
+ }
+
+ // 3. Try reconnects
+ trace!("to_retry: {} peers", to_retry.len());
+ if !to_retry.is_empty() {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for id in to_retry {
+ if let Some(h) = known_hosts.list.get_mut(&id) {
+ if let PeerConnState::Waiting(i, _) = h.state {
+ info!(
+ "Retrying connection to {} at {} ({})",
+ hex::encode(&id),
+ h.addr,
+ i + 1
+ );
+ h.state = PeerConnState::Trying(i);
+ tokio::spawn(self.clone().try_connect(id, h.addr.clone()));
+ }
+ }
+ }
+ }
+
+ // 4. Sleep before next loop iteration
+ tokio::time::delay_for(LOOP_DELAY).await;
+ }
+ }
+
+ async fn ping(self: Arc<Self>, id: ed25519::PublicKey) {
+ let peer = {
+ match self.netapp.client_conns.read().unwrap().get(&id) {
+ None => {
+ warn!("Should ping {}, but no connection", hex::encode(id));
+ return;
+ }
+ Some(peer) => peer.clone(),
+ }
+ };
+
+ let peer_list_hash = self.known_hosts.read().unwrap().hash;
+ let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
+ let ping_time = Instant::now();
+ let ping_msg = PingMessage {
+ id: ping_id,
+ peer_list_hash,
+ };
+
+ debug!(
+ "Sending ping {} to {} at {:?}",
+ ping_id,
+ hex::encode(id),
+ ping_time
+ );
+ match peer.clone().request(ping_msg, prio::HIGH).await {
+ Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
+ Ok(ping_resp) => {
+ let resp_time = Instant::now();
+ debug!(
+ "Got ping response from {} at {:?}",
+ hex::encode(id),
+ resp_time
+ );
+ {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.last_seen = Some(resp_time);
+ host.ping.push_back(resp_time - ping_time);
+ while host.ping.len() > 10 {
+ host.ping.pop_front();
+ }
+ }
+ }
+ if ping_resp.peer_list_hash != peer_list_hash {
+ self.exchange_peers(peer).await;
+ }
+ }
+ }
+ }
+
+ async fn exchange_peers(self: Arc<Self>, peer: Arc<ClientConn>) {
+ let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
+ let pex_message = PeerListMessage { list: peer_list };
+ match peer.request(pex_message, prio::BACKGROUND).await {
+ Err(e) => warn!("Error doing peer exchange: {}", e),
+ Ok(resp) => {
+ self.handle_peer_list(&resp.list[..]);
+ }
+ }
+ }
+
+ fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) {
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ for (id, addr) in list.iter() {
+ if !known_hosts.list.contains_key(id) {
+ known_hosts.list.insert(*id, self.new_peer(id, *addr));
+ }
+ }
+ }
+
+ async fn try_connect(self: Arc<Self>, id: ed25519::PublicKey, addr: SocketAddr) {
+ let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await;
+ if let Err(e) = conn_result {
+ warn!("Error connecting to {}: {}", hex::encode(id), e);
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&id) {
+ host.state = match host.state {
+ PeerConnState::Trying(i) => {
+ if i >= CONN_MAX_RETRIES {
+ PeerConnState::Abandonned
+ } else {
+ PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL)
+ }
+ }
+ _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
+ };
+ }
+ }
+ }
+
+ async fn on_connected(
+ self: Arc<Self>,
+ pk: ed25519::PublicKey,
+ addr: SocketAddr,
+ is_incoming: bool,
+ ) {
+ if is_incoming {
+ if !self.known_hosts.read().unwrap().list.contains_key(&pk) {
+ self.known_hosts
+ .write()
+ .unwrap()
+ .list
+ .insert(pk, self.new_peer(&pk, addr));
+ }
+ } else {
+ info!("Successfully connected to {} at {}", hex::encode(&pk), addr);
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&pk) {
+ host.state = PeerConnState::Connected;
+ known_hosts.update_hash();
+ }
+ }
+ }
+
+ async fn on_disconnected(self: Arc<Self>, pk: ed25519::PublicKey, is_incoming: bool) {
+ if !is_incoming {
+ info!("Connection to {} was closed", hex::encode(pk));
+ let mut known_hosts = self.known_hosts.write().unwrap();
+ if let Some(host) = known_hosts.list.get_mut(&pk) {
+ host.state = PeerConnState::Waiting(0, Instant::now());
+ known_hosts.update_hash();
+ }
+ }
+ }
+
+ pub fn get_peer_list(&self) -> Vec<PeerInfoPub> {
+ let known_hosts = self.known_hosts.read().unwrap();
+ let mut ret = Vec::with_capacity(known_hosts.list.len());
+ for (id, info) in known_hosts.list.iter() {
+ let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
+ pings.sort();
+ if pings.len() > 0 {
+ ret.push(PeerInfoPub {
+ id: id.clone(),
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: Some(
+ pings
+ .iter()
+ .fold(Duration::from_secs(0), |x, y| x + *y)
+ .div_f64(pings.len() as f64),
+ ),
+ max_ping: pings.last().cloned(),
+ med_ping: Some(pings[pings.len() / 2]),
+ });
+ } else {
+ ret.push(PeerInfoPub {
+ id: id.clone(),
+ addr: info.addr,
+ state: info.state,
+ last_seen: info.last_seen,
+ avg_ping: None,
+ max_ping: None,
+ med_ping: None,
+ });
+ }
+ }
+ ret
+ }
+
+ fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo {
+ let state = if *id == self.netapp.pubkey {
+ PeerConnState::Ourself
+ } else {
+ PeerConnState::Waiting(0, Instant::now())
+ };
+ PeerInfo {
+ addr,
+ state,
+ last_seen: None,
+ ping: VecDeque::new(),
+ }
+ }
+}
diff --git a/src/peering/mod.rs b/src/peering/mod.rs
new file mode 100644
index 0000000..beb2e18
--- /dev/null
+++ b/src/peering/mod.rs
@@ -0,0 +1,2 @@
+pub mod basalt;
+pub mod fullmesh;
diff --git a/src/proto.rs b/src/proto.rs
new file mode 100644
index 0000000..58c914e
--- /dev/null
+++ b/src/proto.rs
@@ -0,0 +1,251 @@
+use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::sync::Arc;
+
+use log::trace;
+
+use async_trait::async_trait;
+
+use async_std::io::prelude::WriteExt;
+use async_std::io::ReadExt;
+
+use tokio::io::{ReadHalf, WriteHalf};
+use tokio::net::TcpStream;
+use tokio::sync::{mpsc, watch};
+
+use crate::error::*;
+
+use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat};
+
+const MAX_CHUNK_SIZE: usize = 0x4000;
+
+pub mod prio {
+ pub const HIGH: u8 = 0x20;
+ pub const NORMAL: u8 = 0x40;
+ pub const BACKGROUND: u8 = 0x80;
+
+ pub const PRIMARY: u8 = 0x00;
+ pub const SECONDARY: u8 = 0x01;
+}
+
+pub type RequestID = u16;
+pub type RequestPriority = u8;
+
+struct SendQueueItem {
+ id: RequestID,
+ prio: RequestPriority,
+ data: Vec<u8>,
+ cursor: usize,
+}
+
+struct SendQueue {
+ items: BTreeMap<u8, VecDeque<SendQueueItem>>,
+}
+
+impl SendQueue {
+ fn new() -> Self {
+ Self {
+ items: BTreeMap::new(),
+ }
+ }
+ fn push(&mut self, item: SendQueueItem) {
+ let prio = item.prio;
+ let mut items_at_prio = self
+ .items
+ .remove(&prio)
+ .unwrap_or(VecDeque::with_capacity(4));
+ items_at_prio.push_back(item);
+ self.items.insert(prio, items_at_prio);
+ }
+ fn pop(&mut self) -> Option<SendQueueItem> {
+ match self.items.pop_first() {
+ None => None,
+ Some((prio, mut items_at_prio)) => {
+ let ret = items_at_prio.pop_front();
+ if !items_at_prio.is_empty() {
+ self.items.insert(prio, items_at_prio);
+ }
+ ret
+ }
+ }
+ }
+}
+
+#[async_trait]
+pub(crate) trait SendLoop: Sync {
+ async fn send_loop(
+ self: Arc<Self>,
+ mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
+ mut write: BoxStreamWrite<TokioCompat<WriteHalf<TcpStream>>>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut sending = SendQueue::new();
+ while !*must_exit.borrow() {
+ if let Ok((id, prio, data)) = msg_recv.try_recv() {
+ trace!("send_loop: got {}, {} bytes", id, data.len());
+ sending.push(SendQueueItem {
+ id,
+ prio,
+ data,
+ cursor: 0,
+ });
+ } else if let Some(mut item) = sending.pop() {
+ trace!(
+ "send_loop: sending bytes for {} ({} bytes, {} already sent)",
+ item.id,
+ item.data.len(),
+ item.cursor
+ );
+ let header_id = u16::to_be_bytes(item.id);
+ if write_all_or_exit(&header_id[..], &mut write, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+
+ if item.data.len() - item.cursor > MAX_CHUNK_SIZE {
+ let header_size = u16::to_be_bytes(MAX_CHUNK_SIZE as u16 | 0x8000);
+ if write_all_or_exit(&header_size[..], &mut write, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+
+ let new_cursor = item.cursor + MAX_CHUNK_SIZE as usize;
+ if write_all_or_exit(
+ &item.data[item.cursor..new_cursor],
+ &mut write,
+ &mut must_exit,
+ )
+ .await?
+ .is_none()
+ {
+ break;
+ }
+ item.cursor = new_cursor;
+
+ sending.push(item);
+ } else {
+ let send_len = (item.data.len() - item.cursor) as u16;
+
+ let header_size = u16::to_be_bytes(send_len);
+ if write_all_or_exit(&header_size[..], &mut write, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+
+ if write_all_or_exit(&item.data[item.cursor..], &mut write, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+ }
+ write.flush().await.log_err("Could not flush in send_loop");
+ } else {
+ let (id, prio, data) = msg_recv
+ .recv()
+ .await
+ .ok_or(Error::Message("Connection closed.".into()))?;
+ trace!("send_loop: got {}, {} bytes", id, data.len());
+ sending.push(SendQueueItem {
+ id,
+ prio,
+ data,
+ cursor: 0,
+ });
+ }
+ }
+ Ok(())
+ }
+}
+
+#[async_trait]
+pub(crate) trait RecvLoop: Sync + 'static {
+ async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>);
+
+ async fn recv_loop(
+ self: Arc<Self>,
+ mut read: BoxStreamRead<TokioCompat<ReadHalf<TcpStream>>>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut receiving = HashMap::new();
+ while !*must_exit.borrow() {
+ trace!("recv_loop: reading packet");
+ let mut header_id = [0u8; 2];
+ if read_exact_or_exit(&mut header_id[..], &mut read, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+ let id = RequestID::from_be_bytes(header_id);
+ trace!("recv_loop: got header id: {:04x}", id);
+
+ let mut header_size = [0u8; 2];
+ if read_exact_or_exit(&mut header_size[..], &mut read, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+ let size = RequestID::from_be_bytes(header_size);
+ trace!("recv_loop: got header size: {:04x}", id);
+
+ let has_cont = (size & 0x8000) != 0;
+ let size = size & !0x8000;
+
+ let mut next_slice = vec![0; size as usize];
+ if read_exact_or_exit(&mut next_slice[..], &mut read, &mut must_exit)
+ .await?
+ .is_none()
+ {
+ break;
+ }
+ trace!("recv_loop: read {} bytes", size);
+
+ let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]);
+ msg_bytes.extend_from_slice(&next_slice[..]);
+
+ if has_cont {
+ receiving.insert(id, msg_bytes);
+ } else {
+ tokio::spawn(self.clone().recv_handler(id, msg_bytes));
+ }
+ }
+ Ok(())
+ }
+}
+
+async fn read_exact_or_exit(
+ buf: &mut [u8],
+ read: &mut BoxStreamRead<TokioCompat<ReadHalf<TcpStream>>>,
+ must_exit: &mut watch::Receiver<bool>,
+) -> Result<Option<()>, Error> {
+ tokio::select!(
+ res = read.read_exact(buf) => Ok(Some(res?)),
+ _ = await_exit(must_exit) => Ok(None),
+ )
+}
+
+async fn write_all_or_exit(
+ buf: &[u8],
+ write: &mut BoxStreamWrite<TokioCompat<WriteHalf<TcpStream>>>,
+ must_exit: &mut watch::Receiver<bool>,
+) -> Result<Option<()>, Error> {
+ tokio::select!(
+ res = write.write_all(buf) => Ok(Some(res?)),
+ _ = await_exit(must_exit) => Ok(None),
+ )
+}
+
+async fn await_exit(must_exit: &mut watch::Receiver<bool>) {
+ loop {
+ if must_exit.recv().await == Some(true) {
+ return;
+ }
+ }
+}
diff --git a/src/util.rs b/src/util.rs
new file mode 100644
index 0000000..e83822e
--- /dev/null
+++ b/src/util.rs
@@ -0,0 +1,14 @@
+use serde::Serialize;
+
+// util
+pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}