diff options
author | Quentin <quentin@dufour.io> | 2023-12-27 16:35:43 +0000 |
---|---|---|
committer | Quentin <quentin@dufour.io> | 2023-12-27 16:35:43 +0000 |
commit | 6ff3c6f71efd802da422a371e6168ae528fb2ddc (patch) | |
tree | 62b5d7d9bc7fd2bf3defd1a85ae1b3f34cd4b8ee | |
parent | 609dde413972ebeeb8cd658a5ec9f62b34b5c402 (diff) | |
parent | ea4cd48bba96027882a637df08e313af92a3db46 (diff) | |
download | aerogramme-6ff3c6f71efd802da422a371e6168ae528fb2ddc.tar.gz aerogramme-6ff3c6f71efd802da422a371e6168ae528fb2ddc.zip |
Add storage behind a trait
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/aerogramme/pulls/32
-rw-r--r-- | Cargo.lock | 1048 | ||||
-rw-r--r-- | Cargo.toml | 13 | ||||
-rw-r--r-- | flake.nix | 2 | ||||
-rw-r--r-- | src/bayou.rs | 281 | ||||
-rw-r--r-- | src/config.rs | 152 | ||||
-rw-r--r-- | src/future_rest_admin_api.txt | 174 | ||||
-rw-r--r-- | src/k2v_util.rs | 11 | ||||
-rw-r--r-- | src/login/ldap_provider.rs | 141 | ||||
-rw-r--r-- | src/login/mod.rs | 640 | ||||
-rw-r--r-- | src/login/static_provider.rs | 181 | ||||
-rw-r--r-- | src/mail/incoming.rs | 188 | ||||
-rw-r--r-- | src/mail/mailbox.rs | 159 | ||||
-rw-r--r-- | src/mail/unique_ident.rs | 2 | ||||
-rw-r--r-- | src/mail/user.rs | 55 | ||||
-rw-r--r-- | src/main.rs | 539 | ||||
-rw-r--r-- | src/server.rs | 70 | ||||
-rw-r--r-- | src/storage/garage.rs | 489 | ||||
-rw-r--r-- | src/storage/in_memory.rs | 334 | ||||
-rw-r--r-- | src/storage/mod.rs | 179 | ||||
-rw-r--r-- | src/time.rs | 9 | ||||
-rw-r--r-- | src/timestamp.rs | 65 |
21 files changed, 3098 insertions, 1634 deletions
@@ -33,8 +33,10 @@ dependencies = [ "anyhow", "argon2", "async-trait", + "aws-config", + "aws-sdk-s3", "backtrace", - "base64 0.13.1", + "base64 0.21.2", "boitalettres", "chrono", "clap", @@ -42,7 +44,7 @@ dependencies = [ "eml-codec", "futures", "hex", - "hyper-rustls 0.24.1", + "hyper-rustls", "im", "imap-codec", "itertools", @@ -50,13 +52,10 @@ dependencies = [ "lazy_static", "ldap3", "log", + "nix", "rand", "rmp-serde", "rpassword", - "rusoto_core", - "rusoto_credential", - "rusoto_s3", - "rusoto_signature", "serde", "smtp-message", "smtp-server", @@ -71,6 +70,15 @@ dependencies = [ ] [[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +dependencies = [ + "memchr", +] + +[[package]] name = "android-tzdata" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -93,12 +101,13 @@ checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" [[package]] name = "argon2" -version = "0.3.4" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25df3c03f1040d0069fcd3907e24e36d59f9b6fa07ba49be0eb25a794f036ba7" +checksum = "17ba4cac0a46bc1d2912652a751c47f2a9f3a7fe89bcae2275d418f5270402f9" dependencies = [ "base64ct", "blake2", + "cpufeatures", "password-hash", ] @@ -180,7 +189,7 @@ dependencies = [ "async-lock", "async-task", "concurrent-queue", - "fastrand", + "fastrand 1.9.0", "futures-lite", "slab", ] @@ -394,6 +403,438 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] +name = "aws-config" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11382bd8ac4c6c182a9775990935f96c916a865f1414486595f18eb8cfa9d90b" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http 0.60.1", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-types", + "bytes", + "fastrand 2.0.1", + "hex", + "http", + "hyper", + "ring 0.17.7", + "time 0.3.23", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a1629320d319dc715c6189b172349186557e209d2a7b893ff3d14efd33a47c" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "zeroize", +] + +[[package]] +name = "aws-http" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30e4199d5d62ab09be6a64650c06cc5c4aa45806fed4c74bc4a5c8eaf039a6fa" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-types", + "bytes", + "http", + "http-body", + "pin-project-lite 0.2.10", + "tracing", +] + +[[package]] +name = "aws-runtime" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87116d357c905b53f1828d15366363fd27b330a0393cbef349e653f686d36bad" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-sigv4 1.1.1", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http 0.60.1", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-types", + "fastrand 2.0.1", + "http", + "percent-encoding", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27f89aed8bb54ff816b860cda5d6f010cf54b7b64a46bb66e3edd1063ac5f36e" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-sigv4 1.1.1", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http 0.60.1", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "http-body", + "once_cell", + "percent-encoding", + "regex-lite", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c8539671a6e0ca087464f784576b42796ed934a566b3c1263dcf2a88494d067" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.60.1", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-types", + "bytes", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f96ac3757df21b29102edd909884ceb8d118b96b5e45546b8bb59b8a9a8eb85a" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.60.1", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-types", + "bytes", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fe238802a9ad92721c23fd0b1d52284ec9e3565d6d0a12243608833e2655abb" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http 0.60.1", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "aws-smithy-xml", + "aws-types", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2ce6f507be68e968a33485ced670111d1cbad161ddbbab1e313c03d37d8f4c" +dependencies = [ + "aws-smithy-http 0.55.3", + "form_urlencoded", + "hex", + "hmac", + "http", + "once_cell", + "percent-encoding", + "regex", + "sha2", + "time 0.3.23", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d222297ca90209dc62245f0a490355795f29de362eb5c19caea4f7f55fe69078" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http 0.60.1", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http", + "once_cell", + "p256", + "percent-encoding", + "ring 0.17.7", + "sha2", + "subtle", + "time 0.3.23", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9f65000917e3aa94c259d67fe01fa9e4cd456187d026067d642436e6311a81" +dependencies = [ + "futures-util", + "pin-project-lite 0.2.10", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c2a63681f82fb85ca58d566534b7dc619c782fee0c61c1aa51e2b560c21cb4f" +dependencies = [ + "aws-smithy-http 0.60.1", + "aws-smithy-types 1.1.1", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http", + "http-body", + "md-5", + "pin-project-lite 0.2.10", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a85e16fa903c70c49ab3785e5f4ac2ad2171b36e0616f321011fa57962404bb6" +dependencies = [ + "aws-smithy-types 1.1.1", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b3b693869133551f135e1f2c77cb0b8277d9e3e17feaf2213f735857c4f0d28" +dependencies = [ + "aws-smithy-types 0.55.3", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "once_cell", + "percent-encoding", + "pin-project-lite 0.2.10", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4e816425a6b9caea4929ac97d0cb33674849bd5f0086418abc0d02c63f7a1bf" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "once_cell", + "percent-encoding", + "pin-project-lite 0.2.10", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ab3f6d49e08df2f8d05e1bb5b68998e1e67b76054d3c43e7b954becb9a5e9ac" +dependencies = [ + "aws-smithy-types 1.1.1", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f94a7a3aa509ff9e8b8d80749851d04e5eee0954c43f2e7d6396c4740028737" +dependencies = [ + "aws-smithy-types 1.1.1", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da5b0a3617390e769576321816112f711c13d7e1114685e022505cf51fe5e48" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http 0.60.1", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "bytes", + "fastrand 2.0.1", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "once_cell", + "pin-project-lite 0.2.10", + "pin-utils", + "rustls 0.21.10", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2404c9eb08bfe9af255945254d9afc69a367b7ee008b8db75c05e3bca485fc65" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types 1.1.1", + "bytes", + "http", + "pin-project-lite 0.2.10", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16a3d0bf4f324f4ef9793b86a1701d9700fbcdbd12a846da45eed104c634c6e8" +dependencies = [ + "base64-simd", + "itoa", + "num-integer", + "ryu", + "time 0.3.23", +] + +[[package]] +name = "aws-smithy-types" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aba8136605d14ac88f57dc3a693a9f8a4eab4a3f52bc03ff13746f0cd704e97" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "itoa", + "num-integer", + "pin-project-lite 0.2.10", + "pin-utils", + "ryu", + "serde", + "time 0.3.23", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8f03926587fc881b12b102048bb04305bf7fb8c83e776f0ccc51eaa2378263" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e5d5ee29077e0fcd5ddd0c227b521a33aaf02434b7cdba1c55eec5c1f18ac47" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types 1.1.1", + "http", + "rustc_version", + "tracing", +] + +[[package]] name = "backtrace" version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -409,6 +850,12 @@ dependencies = [ ] [[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + +[[package]] name = "base64" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -421,6 +868,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" [[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + +[[package]] name = "base64ct" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -433,6 +890,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + +[[package]] name = "bitmaps" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -459,16 +922,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ - "digest 0.10.7", -] - -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", + "digest", ] [[package]] @@ -490,7 +944,7 @@ dependencies = [ "async-lock", "async-task", "atomic-waker", - "fastrand", + "fastrand 1.9.0", "futures-lite", "log", ] @@ -534,12 +988,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + +[[package]] name = "cc" -version = "1.0.79" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ "jobserver", + "libc", ] [[package]] @@ -558,7 +1023,6 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", - "serde", "time 0.1.45", "wasm-bindgen", "winapi", @@ -571,7 +1035,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", - "bitflags", + "bitflags 1.3.2", "clap_derive", "clap_lex", "indexmap", @@ -613,6 +1077,12 @@ dependencies = [ ] [[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + +[[package]] name = "core-foundation" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -638,6 +1108,15 @@ dependencies = [ ] [[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + +[[package]] name = "crc32fast" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -714,32 +1193,54 @@ dependencies = [ ] [[package]] -name = "crypto-common" -version = "0.1.6" +name = "crypto-bigint" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" dependencies = [ "generic-array", - "typenum", + "rand_core", + "subtle", + "zeroize", ] [[package]] -name = "crypto-mac" -version = "0.11.1" +name = "crypto-bigint" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d1a86f49236c215f271d40892d5fc950490551400b02ef360692c29815c714" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" dependencies = [ - "generic-array", + "rand_core", "subtle", ] [[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] name = "data-encoding" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + +[[package]] name = "der-parser" version = "7.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -766,46 +1267,16 @@ dependencies = [ [[package]] name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - -[[package]] -name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", + "block-buffer", "crypto-common", "subtle", ] [[package]] -name = "dirs-next" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" -dependencies = [ - "cfg-if", - "dirs-sys-next", -] - -[[package]] -name = "dirs-sys-next" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" -dependencies = [ - "libc", - "redox_users", - "winapi", -] - -[[package]] name = "displaydoc" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -827,6 +1298,18 @@ dependencies = [ ] [[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der", + "elliptic-curve", + "rfc6979", + "signature", +] + +[[package]] name = "ed25519" version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -842,6 +1325,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" [[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der", + "digest", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core", + "sec1", + "subtle", + "zeroize", +] + +[[package]] name = "eml-codec" version = "0.1.2" source = "git+https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git?branch=main#a7bd3c475a58e42b86c163ec075ce01ddae7e60a" @@ -898,6 +1401,22 @@ dependencies = [ ] [[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core", + "subtle", +] + +[[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -972,7 +1491,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -1062,6 +1581,17 @@ dependencies = [ ] [[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core", + "subtle", +] + +[[package]] name = "h2" version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1125,12 +1655,11 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hmac" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "crypto-mac", - "digest 0.9.0", + "digest", ] [[package]] @@ -1193,21 +1722,6 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" -dependencies = [ - "http", - "hyper", - "log", - "rustls 0.20.8", - "rustls-native-certs", - "tokio", - "tokio-rustls 0.23.4", -] - -[[package]] -name = "hyper-rustls" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" @@ -1216,7 +1730,7 @@ dependencies = [ "http", "hyper", "log", - "rustls 0.21.5", + "rustls 0.21.10", "rustls-native-certs", "tokio", "tokio-rustls 0.24.1", @@ -1357,18 +1871,20 @@ dependencies = [ [[package]] name = "k2v-client" -version = "0.1.1" -source = "git+https://git.deuxfleurs.fr/Deuxfleurs/garage.git?tag=v0.8.2#000006d689d2d8075599fbe1653605566ae9d36e" +version = "0.0.4" +source = "git+https://git.deuxfleurs.fr/Deuxfleurs/garage.git?tag=v0.9.0#952c9570c494468643353ee1ae9052b510353665" dependencies = [ + "aws-sigv4 0.55.3", "base64 0.21.2", + "hex", "http", - "hyper-rustls 0.23.2", + "hyper", + "hyper-rustls", "log", - "rusoto_core", - "rusoto_credential", - "rusoto_signature", + "percent-encoding", "serde", "serde_json", + "sha2", "thiserror", "tokio", ] @@ -1414,7 +1930,7 @@ dependencies = [ "log", "nom 2.2.1", "percent-encoding", - "ring", + "ring 0.16.20", "rustls 0.20.8", "rustls-native-certs", "thiserror", @@ -1433,7 +1949,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" dependencies = [ "arrayvec", - "bitflags", + "bitflags 1.3.2", "cfg-if", "ryu", "static_assertions", @@ -1441,9 +1957,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.147" +version = "0.2.151" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libsodium-sys" @@ -1490,13 +2006,12 @@ checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" [[package]] name = "md-5" -version = "0.9.1" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", - "opaque-debug", + "cfg-if", + "digest", ] [[package]] @@ -1564,6 +2079,17 @@ dependencies = [ ] [[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "libc", +] + +[[package]] name = "nom" version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1667,12 +2193,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - -[[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1685,12 +2205,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" [[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + +[[package]] name = "overload" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + +[[package]] name = "parking" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1714,16 +2251,16 @@ checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.3.5", + "redox_syscall", "smallvec", "windows-targets", ] [[package]] name = "password-hash" -version = "0.3.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d791538a6dcc1e7cb7fe6f6b58aca40e7f79403c45b2bc274008b5e647af1d8" +checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" dependencies = [ "base64ct", "rand_core", @@ -1781,6 +2318,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der", + "spki", +] + +[[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1793,7 +2340,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" dependencies = [ "autocfg", - "bitflags", + "bitflags 1.3.2", "cfg-if", "concurrent-queue", "libc", @@ -1897,49 +2444,75 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.2.16" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" dependencies = [ - "bitflags", + "bitflags 1.3.2", ] [[package]] -name = "redox_syscall" -version = "0.3.5" +name = "regex" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" dependencies = [ - "bitflags", + "aho-corasick", + "memchr", + "regex-automata 0.3.7", + "regex-syntax 0.7.5", ] [[package]] -name = "redox_users" -version = "0.4.3" +name = "regex-automata" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" dependencies = [ - "getrandom", - "redox_syscall 0.2.16", - "thiserror", + "regex-syntax 0.6.29", ] [[package]] name = "regex-automata" -version = "0.1.10" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" dependencies = [ - "regex-syntax", + "aho-corasick", + "memchr", + "regex-syntax 0.7.5", ] [[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + +[[package]] name = "regex-syntax" version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + +[[package]] name = "ring" version = "0.16.20" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1948,13 +2521,27 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] [[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", +] + +[[package]] name = "rmp" version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1998,88 +2585,6 @@ dependencies = [ ] [[package]] -name = "rusoto_core" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2" -dependencies = [ - "async-trait", - "base64 0.13.1", - "bytes", - "crc32fast", - "futures", - "http", - "hyper", - "hyper-rustls 0.23.2", - "lazy_static", - "log", - "rusoto_credential", - "rusoto_signature", - "rustc_version", - "serde", - "serde_json", - "tokio", - "xml-rs", -] - -[[package]] -name = "rusoto_credential" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee0a6c13db5aad6047b6a44ef023dbbc21a056b6dab5be3b79ce4283d5c02d05" -dependencies = [ - "async-trait", - "chrono", - "dirs-next", - "futures", - "hyper", - "serde", - "serde_json", - "shlex", - "tokio", - "zeroize", -] - -[[package]] -name = "rusoto_s3" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aae4677183411f6b0b412d66194ef5403293917d66e70ab118f07cc24c5b14d" -dependencies = [ - "async-trait", - "bytes", - "futures", - "rusoto_core", - "xml-rs", -] - -[[package]] -name = "rusoto_signature" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" -dependencies = [ - "base64 0.13.1", - "bytes", - "chrono", - "digest 0.9.0", - "futures", - "hex", - "hmac", - "http", - "hyper", - "log", - "md-5", - "percent-encoding", - "pin-project-lite 0.2.10", - "rusoto_credential", - "rustc_version", - "serde", - "sha2", - "tokio", -] - -[[package]] name = "rustc-demangle" version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2109,7 +2614,7 @@ version = "0.37.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" dependencies = [ - "bitflags", + "bitflags 1.3.2", "errno", "io-lifetimes", "libc", @@ -2124,19 +2629,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" dependencies = [ "log", - "ring", + "ring 0.16.20", "sct", "webpki", ] [[package]] name = "rustls" -version = "0.21.5" +version = "0.21.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79ea77c539259495ce8ca47f53e66ae0330a8819f67e23ac96ca02f50e7b7d36" +checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", - "ring", + "ring 0.17.7", "rustls-webpki", "sct", ] @@ -2164,12 +2669,12 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.1" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15f36a6828982f422756984e47912a7a51dcbc2a197aa791158f8ca61cd8204e" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -2208,8 +2713,22 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", +] + +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", ] [[package]] @@ -2218,7 +2737,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -2273,16 +2792,25 @@ dependencies = [ ] [[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] name = "sha2" -version = "0.9.9" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ - "block-buffer 0.9.0", "cfg-if", "cpufeatures", - "digest 0.9.0", - "opaque-debug", + "digest", ] [[package]] @@ -2295,12 +2823,6 @@ dependencies = [ ] [[package]] -name = "shlex" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" - -[[package]] name = "signal-hook" version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2324,6 +2846,10 @@ name = "signature" version = "1.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core", +] [[package]] name = "sized-chunks" @@ -2378,7 +2904,7 @@ dependencies = [ "lazy_static", "nom 6.1.2", "pin-project", - "regex-automata", + "regex-automata 0.1.10", "serde", ] @@ -2434,6 +2960,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der", +] + +[[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2447,9 +2989,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "subtle" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" @@ -2637,7 +3179,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.5", + "rustls 0.21.10", "tokio", ] @@ -2847,6 +3389,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + +[[package]] name = "url" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2858,6 +3406,18 @@ dependencies = [ ] [[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + +[[package]] +name = "uuid" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" + +[[package]] name = "valuable" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2876,6 +3436,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + +[[package]] name = "waker-fn" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2994,8 +3560,8 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3129,10 +3695,10 @@ dependencies = [ ] [[package]] -name = "xml-rs" -version = "0.8.16" +name = "xmlparser" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47430998a7b5d499ccee752b41567bc3afc57e1327dc855b1a2aa44ce29b5fa1" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "zeroize" @@ -7,11 +7,13 @@ license = "AGPL-3.0" description = "Encrypted mail storage over Garage" [dependencies] +aws-config = { version = "1.1.1", features = ["behavior-version-latest"] } +aws-sdk-s3 = "1.9.0" anyhow = "1.0.28" -argon2 = "0.3" +argon2 = "0.5" async-trait = "0.1" backtrace = "0.3" -base64 = "0.13" +base64 = "0.21" clap = { version = "3.1.18", features = ["derive", "env"] } duplexify = "1.1.0" eml-codec = { git = "https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git", branch = "main" } @@ -22,11 +24,8 @@ itertools = "0.10" lazy_static = "1.4" ldap3 = { version = "0.10", default-features = false, features = ["tls-rustls"] } log = "0.4" -rusoto_core = { version = "0.48.0", default_features = false, features = ["rustls"] } -rusoto_credential = "0.48.0" -rusoto_s3 = { version = "0.48.0", default_features = false, features = ["rustls"] } hyper-rustls = { version = "0.24", features = ["http2"] } -rusoto_signature = "0.48.0" +nix = { version = "0.27", features = ["signal"] } serde = "1.0.137" rand = "0.8.5" rmp-serde = "0.15" @@ -44,7 +43,7 @@ tower = "0.4" imap-codec = { git = "https://github.com/superboum/imap-codec.git", branch = "v0.5.x" } chrono = { version = "0.4", default-features = false, features = ["alloc"] } -k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", tag = "v0.8.2" } +k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", tag = "v0.9.0" } boitalettres = { git = "https://git.deuxfleurs.fr/quentin/boitalettres.git", branch = "expose-mydatetime" } smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } @@ -58,10 +58,12 @@ buildInputs = [ cargo2nix.packages.x86_64-linux.default fenix.packages.x86_64-linux.minimal.toolchain + fenix.packages.x86_64-linux.rust-analyzer ]; shellHook = '' echo "AEROGRAME DEVELOPMENT SHELL ${fenix.packages.x86_64-linux.minimal.rustc}" export RUST_SRC_PATH="${fenix.packages.x86_64-linux.latest.rust-src}/lib/rustlib/src/rust/library" + export RUST_ANALYZER_INTERNALS_DO_NOT_USE='this is unstable' ''; }; diff --git a/src/bayou.rs b/src/bayou.rs index 9f70017..7253a30 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -6,18 +5,12 @@ use anyhow::{anyhow, bail, Result}; use log::{debug, error, info}; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::{watch, Notify}; -use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; - use crate::cryptoblob::*; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::*; const KEEP_STATE_EVERY: usize = 64; @@ -48,12 +41,10 @@ pub trait BayouState: } pub struct Bayou<S: BayouState> { - bucket: String, path: String, key: Key, - k2v: K2vClient, - s3: S3Client, + storage: storage::Store, checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option<S>)>, @@ -62,28 +53,27 @@ pub struct Bayou<S: BayouState> { last_try_checkpoint: Option<Instant>, watch: Arc<K2vWatch>, - last_sync_watch_ct: Option<CausalityToken>, + last_sync_watch_ct: storage::RowRef, } impl<S: BayouState> Bayou<S> { - pub fn new(creds: &Credentials, path: String) -> Result<Self> { - let k2v_client = creds.k2v_client()?; - let s3_client = creds.s3_client()?; + pub async fn new(creds: &Credentials, path: String) -> Result<Self> { + let storage = creds.storage.build().await?; - let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; + //let target = k2v_client.row(&path, WATCH_SK); + let target = storage::RowRef::new(&path, WATCH_SK); + let watch = K2vWatch::new(creds, target.clone()).await?; Ok(Self { - bucket: creds.bucket().to_string(), path, + storage, key: creds.keys.master.clone(), - k2v: k2v_client, - s3: s3_client, checkpoint: (Timestamp::zero(), S::default()), history: vec![], last_sync: None, last_try_checkpoint: None, watch, - last_sync_watch_ct: None, + last_sync_watch_ct: target, }) } @@ -103,18 +93,11 @@ impl<S: BayouState> Bayou<S> { } else { debug!("(sync) loading checkpoint: {}", key); - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - + let buf = self + .storage + .blob_fetch(&storage::BlobRef(key.to_string())) + .await? + .value; debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::<S>(&buf, &self.key)?; @@ -146,42 +129,34 @@ impl<S: BayouState> Bayou<S> { let ts_ser = self.checkpoint.0.to_string(); debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self - .k2v - .read_batch(&[BatchReadOp { - partition_key: &self.path, - filter: Filter { - start: Some(&ts_ser), - end: Some(WATCH_SK), - prefix: None, - limit: None, - reverse: false, - }, - single_item: false, - conflicts_only: false, - tombstones: false, - }]) - .await? - .into_iter() - .next() - .ok_or(anyhow!("Missing K2V result"))? - .items; + .storage + .row_fetch(&storage::Selector::Range { + shard: &self.path, + sort_begin: &ts_ser, + sort_end: WATCH_SK, + }) + .await?; let mut ops = vec![]; - for (tsstr, val) in ops_map { - let ts = tsstr + for row_value in ops_map { + let row = row_value.row_ref; + let sort_key = row.uid.sort; + let ts = sort_key .parse::<Timestamp>() - .map_err(|_| anyhow!("Invalid operation timestamp: {}", tsstr))?; - if val.value.len() != 1 { - bail!("Invalid operation, has {} values", val.value.len()); + .map_err(|_| anyhow!("Invalid operation timestamp: {}", sort_key))?; + + let val = row_value.value; + if val.len() != 1 { + bail!("Invalid operation, has {} values", val.len()); } - match &val.value[0] { - K2vValue::Value(v) => { + match &val[0] { + storage::Alternative::Value(v) => { let op = open_deserialize::<S::Op>(v, &self.key)?; - debug!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op); + debug!("(sync) operation {}: {:?}", sort_key, op); ops.push((ts, op)); } - K2vValue::Tombstone => { - unreachable!(); + storage::Alternative::Tombstone => { + continue; } } } @@ -276,15 +251,12 @@ impl<S: BayouState> Bayou<S> { .map(|(ts, _, _)| ts) .unwrap_or(&self.checkpoint.0), ); - self.k2v - .insert_item( - &self.path, - &ts.to_string(), - seal_serialize(&op, &self.key)?, - None, - ) - .await?; + let row_val = storage::RowVal::new( + storage::RowRef::new(&self.path, &ts.to_string()), + seal_serialize(&op, &self.key)?, + ); + self.storage.row_insert(vec![row_val]).await?; self.watch.notify.notify_one(); let new_state = self.state().apply(&op); @@ -384,13 +356,11 @@ impl<S: BayouState> Bayou<S> { let cryptoblob = seal_serialize(&state_cp, &self.key)?; debug!("(cp) checkpoint body length: {}", cryptoblob.len()); - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/checkpoint/{}", self.path, ts_cp.to_string()), - body: Some(cryptoblob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), + cryptoblob.into(), + ); + self.storage.blob_insert(blob_val).await?; // Drop old checkpoints (but keep at least CHECKPOINTS_TO_KEEP of them) let ecp_len = existing_checkpoints.len(); @@ -400,25 +370,20 @@ impl<S: BayouState> Bayou<S> { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { debug!("(cp) drop old checkpoint {}", key); - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: key.to_string(), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.storage + .blob_rm(&storage::BlobRef(key.to_string())) + .await?; } // Delete corresponding range of operations let ts_ser = existing_checkpoints[last_to_keep].0.to_string(); - self.k2v - .delete_batch(&[BatchDeleteOp { - partition_key: &self.path, - prefix: None, - start: None, - end: Some(&ts_ser), - single_item: false, - }]) - .await?; + self.storage + .row_rm(&storage::Selector::Range { + shard: &self.path, + sort_begin: "", + sort_end: &ts_ser, + }) + .await? } Ok(()) @@ -437,22 +402,14 @@ impl<S: BayouState> Bayou<S> { async fn list_checkpoints(&self) -> Result<Vec<(Timestamp, String)>> { let prefix = format!("{}/checkpoint/", self.path); - let lor = ListObjectsV2Request { - bucket: self.bucket.clone(), - max_keys: Some(1000), - prefix: Some(prefix.clone()), - ..Default::default() - }; - - let checkpoints_res = self.s3.list_objects_v2(lor).await?; + let checkpoints_res = self.storage.blob_list(&prefix).await?; let mut checkpoints = vec![]; - for object in checkpoints_res.contents.unwrap_or_default() { - if let Some(key) = object.key { - if let Some(ckid) = key.strip_prefix(&prefix) { - if let Ok(ts) = ckid.parse::<Timestamp>() { - checkpoints.push((ts, key)); - } + for object in checkpoints_res { + let key = object.0; + if let Some(ckid) = key.strip_prefix(&prefix) { + if let Ok(ts) = ckid.parse::<Timestamp>() { + checkpoints.push((ts, key.into())); } } } @@ -464,68 +421,66 @@ impl<S: BayouState> Bayou<S> { // ---- Bayou watch in K2V ---- struct K2vWatch { - pk: String, - sk: String, - rx: watch::Receiver<Option<CausalityToken>>, + target: storage::RowRef, + rx: watch::Receiver<storage::RowRef>, notify: Notify, } impl K2vWatch { /// Creates a new watch and launches subordinate threads. /// These threads hold Weak pointers to the struct; - /// the exit when the Arc is dropped. - fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> { - let (tx, rx) = watch::channel::<Option<CausalityToken>>(None); + /// they exit when the Arc is dropped. + async fn new(creds: &Credentials, target: storage::RowRef) -> Result<Arc<Self>> { + let storage = creds.storage.build().await?; + + let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); let notify = Notify::new(); - let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, notify }); - tokio::spawn(Self::background_task( - Arc::downgrade(&watch), - creds.k2v_client()?, - tx, - )); + tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); Ok(watch) } async fn background_task( self_weak: Weak<Self>, - k2v: K2vClient, - tx: watch::Sender<Option<CausalityToken>>, + storage: storage::Store, + tx: watch::Sender<storage::RowRef>, ) { - let mut ct = None; + let mut row = match Weak::upgrade(&self_weak) { + Some(this) => this.target.clone(), + None => { + error!("can't start loop"); + return; + } + }; + while let Some(this) = Weak::upgrade(&self_weak) { debug!( - "bayou k2v watch bg loop iter ({}, {}): ct = {:?}", - this.pk, this.sk, ct + "bayou k2v watch bg loop iter ({}, {})", + this.target.uid.shard, this.target.uid.sort ); tokio::select!( _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + update = storage.row_poll(&row) => { match update { Err(e) => { error!("Error in bayou k2v wait value changed: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } - Ok(cv) => { - if tx.send(Some(cv.causality.clone())).is_err() { + Ok(new_value) => { + row = new_value.row_ref; + if tx.send(row.clone()).is_err() { break; } - ct = Some(cv.causality); } } } _ = this.notify.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - if let Err(e) = k2v - .insert_item( - &this.pk, - &this.sk, - rand, - ct.clone(), - ) - .await + let row_val = storage::RowVal::new(row.clone(), rand); + if let Err(e) = storage.row_insert(vec![row_val]).await { error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -536,59 +491,3 @@ impl K2vWatch { info!("bayou k2v watch bg loop exiting"); } } - -// ---- TIMESTAMP CLASS ---- - -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub struct Timestamp { - pub msec: u64, - pub rand: u64, -} - -impl Timestamp { - #[allow(dead_code)] - // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. - pub fn now() -> Self { - let mut rng = thread_rng(); - Self { - msec: now_msec(), - rand: rng.gen::<u64>(), - } - } - - pub fn after(other: &Self) -> Self { - let mut rng = thread_rng(); - Self { - msec: std::cmp::max(now_msec(), other.msec + 1), - rand: rng.gen::<u64>(), - } - } - - pub fn zero() -> Self { - Self { msec: 0, rand: 0 } - } -} - -impl ToString for Timestamp { - fn to_string(&self) -> String { - let mut bytes = [0u8; 16]; - bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); - bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); - hex::encode(bytes) - } -} - -impl FromStr for Timestamp { - type Err = &'static str; - - fn from_str(s: &str) -> Result<Timestamp, &'static str> { - let bytes = hex::decode(s).map_err(|_| "invalid hex")?; - if bytes.len() != 16 { - return Err("bad length"); - } - Ok(Self { - msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), - rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), - }) - } -} diff --git a/src/config.rs b/src/config.rs index 074c192..1438910 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::io::Read; +use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; @@ -7,77 +7,141 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct Config { - pub s3_endpoint: String, - pub k2v_endpoint: String, - pub aws_region: String, +pub struct CompanionConfig { + pub pid: Option<PathBuf>, + pub imap: ImapConfig, - pub login_static: Option<LoginStaticConfig>, - pub login_ldap: Option<LoginLdapConfig>, + #[serde(flatten)] + pub users: LoginStaticConfig, +} - pub lmtp: Option<LmtpConfig>, - pub imap: Option<ImapConfig>, +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ProviderConfig { + pub pid: Option<PathBuf>, + pub imap: ImapConfig, + pub lmtp: LmtpConfig, + pub users: UserManagement, } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LoginStaticConfig { - pub default_bucket: Option<String>, - pub users: HashMap<String, LoginStaticUser>, +#[serde(tag = "user_driver")] +pub enum UserManagement { + Static(LoginStaticConfig), + Ldap(LoginLdapConfig), } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LoginStaticUser { - #[serde(default)] - pub email_addresses: Vec<String>, - pub password: String, +pub struct LmtpConfig { + pub bind_addr: SocketAddr, + pub hostname: String, +} - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub bucket: Option<String>, +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ImapConfig { + pub bind_addr: SocketAddr, +} - pub user_secret: String, - #[serde(default)] - pub alternate_user_secrets: Vec<String>, +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LoginStaticConfig { + pub user_list: PathBuf, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "storage_driver")] +pub enum LdapStorage { + Garage(LdapGarageConfig), + InMemory, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LdapGarageConfig { + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_region: String, - pub master_key: Option<String>, - pub secret_key: Option<String>, + pub aws_access_key_id_attr: String, + pub aws_secret_access_key_attr: String, + pub bucket_attr: Option<String>, + pub default_bucket: Option<String>, } #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LoginLdapConfig { + // LDAP connection info pub ldap_server: String, - #[serde(default)] pub pre_bind_on_login: bool, pub bind_dn: Option<String>, pub bind_password: Option<String>, - pub search_base: String, + + // Schema-like info required for Aerogramme's logic pub username_attr: String, #[serde(default = "default_mail_attr")] pub mail_attr: String, - pub aws_access_key_id_attr: String, - pub aws_secret_access_key_attr: String, - pub user_secret_attr: String, - pub alternate_user_secrets_attr: Option<String>, + // The field that will contain the crypto root thingy + pub crypto_root_attr: String, - pub bucket: Option<String>, - pub bucket_attr: Option<String>, + // Storage related thing + #[serde(flatten)] + pub storage: LdapStorage, } +// ---- + #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct LmtpConfig { - pub bind_addr: SocketAddr, - pub hostname: String, +#[serde(tag = "storage_driver")] +pub enum StaticStorage { + Garage(StaticGarageConfig), + InMemory, } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct ImapConfig { - pub bind_addr: SocketAddr, +pub struct StaticGarageConfig { + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_region: String, + + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, } -pub fn read_config(config_file: PathBuf) -> Result<Config> { +pub type UserList = HashMap<String, UserEntry>; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UserEntry { + #[serde(default)] + pub email_addresses: Vec<String>, + pub password: String, + pub crypto_root: String, + + #[serde(flatten)] + pub storage: StaticStorage, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SetupEntry { + #[serde(default)] + pub email_addresses: Vec<String>, + + #[serde(default)] + pub clear_password: Option<String>, + + #[serde(flatten)] + pub storage: StaticStorage, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "role")] +pub enum AnyConfig { + Companion(CompanionConfig), + Provider(ProviderConfig), +} + +// --- +pub fn read_config<T: serde::de::DeserializeOwned>(config_file: PathBuf) -> Result<T> { let mut file = std::fs::OpenOptions::new() .read(true) .open(config_file.as_path())?; @@ -88,6 +152,18 @@ pub fn read_config(config_file: PathBuf) -> Result<Config> { Ok(toml::from_str(&config)?) } +pub fn write_config<T: Serialize>(config_file: PathBuf, config: &T) -> Result<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(config_file.as_path())?; + + file.write_all(toml::to_string(config)?.as_bytes())?; + + Ok(()) +} + fn default_mail_attr() -> String { "mail".into() } diff --git a/src/future_rest_admin_api.txt b/src/future_rest_admin_api.txt new file mode 100644 index 0000000..19ece27 --- /dev/null +++ b/src/future_rest_admin_api.txt @@ -0,0 +1,174 @@ + Command::FirstLogin { + creds, + user_secrets, + } => { + let creds = make_storage_creds(creds); + let user_secrets = make_user_secrets(user_secrets); + + println!("Please enter your password for key decryption."); + println!("If you are using LDAP login, this must be your LDAP password."); + println!("If you are using the static login provider, enter any password, and this will also become your password for local IMAP access."); + let password = rpassword::prompt_password("Enter password: ")?; + let password_confirm = rpassword::prompt_password("Confirm password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + + CryptoKeys::init(&creds, &user_secrets, &password).await?; + + println!(""); + println!("Cryptographic key setup is complete."); + println!(""); + println!("If you are using the static login provider, add the following section to your .toml configuration file:"); + println!(""); + dump_config(&password, &creds); + } + Command::InitializeLocalKeys { creds } => { + let creds = make_storage_creds(creds); + + println!("Please enter a password for local IMAP access."); + println!("This password is not used for key decryption, your keys will be printed below (do not lose them!)"); + println!( + "If you plan on using LDAP login, stop right here and use `first-login` instead" + ); + let password = rpassword::prompt_password("Enter password: ")?; + let password_confirm = rpassword::prompt_password("Confirm password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + + let master = gen_key(); + let (_, secret) = gen_keypair(); + let keys = CryptoKeys::init_without_password(&creds, &master, &secret).await?; + + println!(""); + println!("Cryptographic key setup is complete."); + println!(""); + println!("Add the following section to your .toml configuration file:"); + println!(""); + dump_config(&password, &creds); + dump_keys(&keys); + } + Command::AddPassword { + creds, + user_secrets, + gen, + } => { + let creds = make_storage_creds(creds); + let user_secrets = make_user_secrets(user_secrets); + + let existing_password = + rpassword::prompt_password("Enter existing password to decrypt keys: ")?; + let new_password = if gen { + let password = base64::encode_config( + &u128::to_be_bytes(thread_rng().gen())[..10], + base64::URL_SAFE_NO_PAD, + ); + println!("Your new password: {}", password); + println!("Keep it safe!"); + password + } else { + let password = rpassword::prompt_password("Enter new password: ")?; + let password_confirm = rpassword::prompt_password("Confirm new password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + password + }; + + let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?; + keys.add_password(&creds, &user_secrets, &new_password) + .await?; + println!(""); + println!("New password added successfully."); + } + Command::DeletePassword { + creds, + user_secrets, + allow_delete_all, + } => { + let creds = make_storage_creds(creds); + let user_secrets = make_user_secrets(user_secrets); + + let existing_password = rpassword::prompt_password("Enter password to delete: ")?; + + let keys = match allow_delete_all { + true => Some(CryptoKeys::open(&creds, &user_secrets, &existing_password).await?), + false => None, + }; + + CryptoKeys::delete_password(&creds, &existing_password, allow_delete_all).await?; + + println!(""); + println!("Password was deleted successfully."); + + if let Some(keys) = keys { + println!("As a reminder, here are your cryptographic keys:"); + dump_keys(&keys); + } + } + Command::ShowKeys { + creds, + user_secrets, + } => { + let creds = make_storage_creds(creds); + let user_secrets = make_user_secrets(user_secrets); + + let existing_password = rpassword::prompt_password("Enter key decryption password: ")?; + + let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?; + dump_keys(&keys); + } + } + + Ok(()) +} + +fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials { + let s3_region = Region { + name: c.region.clone(), + endpoint: c.s3_endpoint, + }; + let k2v_region = Region { + name: c.region, + endpoint: c.k2v_endpoint, + }; + StorageCredentials { + k2v_region, + s3_region, + aws_access_key_id: c.aws_access_key_id, + aws_secret_access_key: c.aws_secret_access_key, + bucket: c.bucket, + } +} + +fn make_user_secrets(c: UserSecretsArgs) -> UserSecrets { + UserSecrets { + user_secret: c.user_secret, + alternate_user_secrets: c + .alternate_user_secrets + .split(',') + .map(|x| x.trim()) + .filter(|x| !x.is_empty()) + .map(|x| x.to_string()) + .collect(), + } +} + +fn dump_config(password: &str, creds: &StorageCredentials) { + println!("[login_static.users.<username>]"); + println!( + "password = \"{}\"", + hash_password(password).expect("unable to hash password") + ); + println!("aws_access_key_id = \"{}\"", creds.aws_access_key_id); + println!( + "aws_secret_access_key = \"{}\"", + creds.aws_secret_access_key + ); +} + +fn dump_keys(keys: &CryptoKeys) { + println!("master_key = \"{}\"", base64::encode(&keys.master)); + println!("secret_key = \"{}\"", base64::encode(&keys.secret)); +} diff --git a/src/k2v_util.rs b/src/k2v_util.rs index 9dadab4..3cd969b 100644 --- a/src/k2v_util.rs +++ b/src/k2v_util.rs @@ -1,14 +1,10 @@ +/* use anyhow::Result; - -use k2v_client::{CausalValue, CausalityToken, K2vClient}; - // ---- UTIL: function to wait for a value to have changed in K2V ---- pub async fn k2v_wait_value_changed( - k2v: &K2vClient, - pk: &str, - sk: &str, - prev_ct: &Option<CausalityToken>, + k2v: &storage::RowStore, + key: &storage::RowRef, ) -> Result<CausalValue> { loop { if let Some(ct) = prev_ct { @@ -27,3 +23,4 @@ pub async fn k2v_wait_value_changed( } } } +*/ diff --git a/src/login/ldap_provider.rs b/src/login/ldap_provider.rs index 2eeb6d9..81e5879 100644 --- a/src/login/ldap_provider.rs +++ b/src/login/ldap_provider.rs @@ -5,10 +5,9 @@ use log::debug; use crate::config::*; use crate::login::*; +use crate::storage; pub struct LdapLoginProvider { - k2v_region: Region, - s3_region: Region, ldap_server: String, pre_bind_on_login: bool, @@ -18,13 +17,10 @@ pub struct LdapLoginProvider { attrs_to_retrieve: Vec<String>, username_attr: String, mail_attr: String, + crypto_root_attr: String, - aws_access_key_id_attr: String, - aws_secret_access_key_attr: String, - user_secret_attr: String, - alternate_user_secrets_attr: Option<String>, - - bucket_source: BucketSource, + storage_specific: StorageSpecific, + in_memory_store: storage::in_memory::MemDb, } enum BucketSource { @@ -32,8 +28,16 @@ enum BucketSource { Attr(String), } +enum StorageSpecific { + InMemory, + Garage { + from_config: LdapGarageConfig, + bucket_source: BucketSource, + }, +} + impl LdapLoginProvider { - pub fn new(config: LoginLdapConfig, k2v_region: Region, s3_region: Region) -> Result<Self> { + pub fn new(config: LoginLdapConfig) -> Result<Self> { let bind_dn_and_pw = match (config.bind_dn, config.bind_password) { (Some(dn), Some(pw)) => Some((dn, pw)), (None, None) => None, @@ -42,12 +46,6 @@ impl LdapLoginProvider { ), }; - let bucket_source = match (config.bucket, config.bucket_attr) { - (Some(b), None) => BucketSource::Constant(b), - (None, Some(a)) => BucketSource::Attr(a), - _ => bail!("Must set `bucket` or `bucket_attr`, but not both"), - }; - if config.pre_bind_on_login && bind_dn_and_pw.is_none() { bail!("Cannot use `pre_bind_on_login` without setting `bind_dn` and `bind_password`"); } @@ -55,20 +53,32 @@ impl LdapLoginProvider { let mut attrs_to_retrieve = vec![ config.username_attr.clone(), config.mail_attr.clone(), - config.aws_access_key_id_attr.clone(), - config.aws_secret_access_key_attr.clone(), - config.user_secret_attr.clone(), + config.crypto_root_attr.clone(), ]; - if let Some(a) = &config.alternate_user_secrets_attr { - attrs_to_retrieve.push(a.clone()); - } - if let BucketSource::Attr(a) = &bucket_source { - attrs_to_retrieve.push(a.clone()); - } + + // storage specific + let specific = match config.storage { + LdapStorage::InMemory => StorageSpecific::InMemory, + LdapStorage::Garage(grgconf) => { + let bucket_source = + match (grgconf.default_bucket.clone(), grgconf.bucket_attr.clone()) { + (Some(b), None) => BucketSource::Constant(b), + (None, Some(a)) => BucketSource::Attr(a), + _ => bail!("Must set `bucket` or `bucket_attr`, but not both"), + }; + + if let BucketSource::Attr(a) = &bucket_source { + attrs_to_retrieve.push(a.clone()); + } + + StorageSpecific::Garage { + from_config: grgconf, + bucket_source, + } + } + }; Ok(Self { - k2v_region, - s3_region, ldap_server: config.ldap_server, pre_bind_on_login: config.pre_bind_on_login, bind_dn_and_pw, @@ -76,29 +86,43 @@ impl LdapLoginProvider { attrs_to_retrieve, username_attr: config.username_attr, mail_attr: config.mail_attr, - aws_access_key_id_attr: config.aws_access_key_id_attr, - aws_secret_access_key_attr: config.aws_secret_access_key_attr, - user_secret_attr: config.user_secret_attr, - alternate_user_secrets_attr: config.alternate_user_secrets_attr, - bucket_source, + crypto_root_attr: config.crypto_root_attr, + storage_specific: specific, + in_memory_store: storage::in_memory::MemDb::new(), }) } - fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<StorageCredentials> { - let aws_access_key_id = get_attr(user, &self.aws_access_key_id_attr)?; - let aws_secret_access_key = get_attr(user, &self.aws_secret_access_key_attr)?; - let bucket = match &self.bucket_source { - BucketSource::Constant(b) => b.clone(), - BucketSource::Attr(a) => get_attr(user, a)?, + async fn storage_creds_from_ldap_user(&self, user: &SearchEntry) -> Result<Builder> { + let storage: Builder = match &self.storage_specific { + StorageSpecific::InMemory => { + self.in_memory_store + .builder(&get_attr(user, &self.username_attr)?) + .await + } + StorageSpecific::Garage { + from_config, + bucket_source, + } => { + let aws_access_key_id = get_attr(user, &from_config.aws_access_key_id_attr)?; + let aws_secret_access_key = + get_attr(user, &from_config.aws_secret_access_key_attr)?; + let bucket = match bucket_source { + BucketSource::Constant(b) => b.clone(), + BucketSource::Attr(a) => get_attr(user, &a)?, + }; + + storage::garage::GarageBuilder::new(storage::garage::GarageConf { + region: from_config.aws_region.clone(), + s3_endpoint: from_config.s3_endpoint.clone(), + k2v_endpoint: from_config.k2v_endpoint.clone(), + aws_access_key_id, + aws_secret_access_key, + bucket, + })? + } }; - Ok(StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id, - aws_secret_access_key, - bucket, - }) + Ok(storage) } } @@ -148,22 +172,16 @@ impl LoginProvider for LdapLoginProvider { .context("Invalid password")?; debug!("Ldap login with user name {} successfull", username); - let storage = self.storage_creds_from_ldap_user(&user)?; + // cryptography + let crstr = get_attr(&user, &self.crypto_root_attr)?; + let cr = CryptoRoot(crstr); + let keys = cr.crypto_keys(password)?; - let user_secret = get_attr(&user, &self.user_secret_attr)?; - let alternate_user_secrets = match &self.alternate_user_secrets_attr { - None => vec![], - Some(a) => user.attrs.get(a).cloned().unwrap_or_default(), - }; - let user_secrets = UserSecrets { - user_secret, - alternate_user_secrets, - }; + // storage + let storage = self.storage_creds_from_ldap_user(&user).await?; drop(ldap); - let keys = CryptoKeys::open(&storage, &user_secrets, password).await?; - Ok(Credentials { storage, keys }) } @@ -201,11 +219,14 @@ impl LoginProvider for LdapLoginProvider { let user = SearchEntry::construct(matches.into_iter().next().unwrap()); debug!("Found matching LDAP user for email {}: {}", email, user.dn); - let storage = self.storage_creds_from_ldap_user(&user)?; - drop(ldap); + // cryptography + let crstr = get_attr(&user, &self.crypto_root_attr)?; + let cr = CryptoRoot(crstr); + let public_key = cr.public_key()?; - let k2v_client = storage.k2v_client()?; - let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + // storage + let storage = self.storage_creds_from_ldap_user(&user).await?; + drop(ldap); Ok(PublicCredentials { storage, diff --git a/src/login/mod.rs b/src/login/mod.rs index 3fab90a..2926738 100644 --- a/src/login/mod.rs +++ b/src/login/mod.rs @@ -1,20 +1,15 @@ pub mod ldap_provider; pub mod static_provider; -use std::collections::BTreeMap; +use base64::Engine; use std::sync::Arc; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; -use k2v_client::{ - BatchInsertOp, BatchReadOp, CausalValue, CausalityToken, Filter, K2vClient, K2vValue, -}; use rand::prelude::*; -use rusoto_core::HttpClient; -use rusoto_credential::{AwsCredentials, StaticProvider}; -use rusoto_s3::S3Client; use crate::cryptoblob::*; +use crate::storage::*; /// The trait LoginProvider defines the interface for a login provider that allows /// to retrieve storage and cryptographic credentials for access to a user account @@ -38,7 +33,7 @@ pub type ArcLoginProvider = Arc<dyn LoginProvider + Send + Sync>; #[derive(Clone, Debug)] pub struct Credentials { /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) - pub storage: StorageCredentials, + pub storage: Builder, /// The cryptographic keys are used to encrypt and decrypt data stored in S3 and K2V pub keys: CryptoKeys, } @@ -46,32 +41,93 @@ pub struct Credentials { #[derive(Clone, Debug)] pub struct PublicCredentials { /// The storage credentials are used to authenticate access to the underlying storage (S3, K2V) - pub storage: StorageCredentials, + pub storage: Builder, pub public_key: PublicKey, } -/// The struct StorageCredentials contains access key to an S3 and K2V bucket -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct StorageCredentials { - pub s3_region: Region, - pub k2v_region: Region, - - pub aws_access_key_id: String, - pub aws_secret_access_key: String, - pub bucket: String, -} - -/// The struct UserSecrets represents intermediary secrets that are mixed in with the user's -/// password when decrypting the cryptographic keys that are stored in their bucket. -/// These secrets should be stored somewhere else (e.g. in the LDAP server or in the -/// local config file), as an additionnal authentification factor so that the password -/// isn't enough just alone to decrypt the content of a user's bucket. -pub struct UserSecrets { - /// The main user secret that will be used to encrypt keys when a new password is added - pub user_secret: String, - /// Alternative user secrets that will be tried when decrypting keys that were encrypted - /// with old passwords - pub alternate_user_secrets: Vec<String>, +use serde::{Deserialize, Serialize}; +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CryptoRoot(pub String); + +impl CryptoRoot { + pub fn create_pass(password: &str, k: &CryptoKeys) -> Result<Self> { + let bytes = k.password_seal(password)?; + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:pass:{}", b64); + Ok(Self(cr)) + } + + pub fn create_cleartext(k: &CryptoKeys) -> Self { + let bytes = k.serialize(); + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:cleartext:{}", b64); + Self(cr) + } + + pub fn create_incoming(pk: &PublicKey) -> Self { + let bytes: &[u8] = &pk[..]; + let b64 = base64::engine::general_purpose::STANDARD_NO_PAD.encode(bytes); + let cr = format!("aero:cryptoroot:incoming:{}", b64); + Self(cr) + } + + pub fn public_key(&self) -> Result<PublicKey> { + match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { + ["aero", "cryptoroot", "pass", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + if blob.len() < 32 { + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); + } + PublicKey::from_slice(&blob[..32]).context("must be a valid public key") + } + ["aero", "cryptoroot", "cleartext", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + Ok(CryptoKeys::deserialize(&blob)?.public) + } + ["aero", "cryptoroot", "incoming", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + if blob.len() < 32 { + bail!( + "Decoded data is {} bytes long, expect at least 32 bytes", + blob.len() + ); + } + PublicKey::from_slice(&blob[..32]).context("must be a valid public key") + } + ["aero", "cryptoroot", "keyring", _] => { + bail!("keyring is not yet implemented!") + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), + } + } + pub fn crypto_keys(&self, password: &str) -> Result<CryptoKeys> { + match self.0.splitn(4, ':').collect::<Vec<&str>>()[..] { + ["aero", "cryptoroot", "pass", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + CryptoKeys::password_open(password, &blob) + } + ["aero", "cryptoroot", "cleartext", b64blob] => { + let blob = base64::engine::general_purpose::STANDARD_NO_PAD.decode(b64blob)?; + CryptoKeys::deserialize(&blob) + } + ["aero", "cryptoroot", "incoming", _] => { + bail!("incoming cryptoroot does not contain a crypto key!") + } + ["aero", "cryptoroot", "keyring", _] => { + bail!("keyring is not yet implemented!") + } + _ => bail!(format!( + "passed string '{}' is not a valid cryptoroot", + self.0 + )), + } + } } /// The struct CryptoKeys contains the cryptographic keys used to encrypt and decrypt @@ -86,426 +142,22 @@ pub struct CryptoKeys { pub public: PublicKey, } -/// A custom S3 region, composed of a region name and endpoint. -/// We use this instead of rusoto_signature::Region so that we can -/// derive Hash and Eq -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct Region { - pub name: String, - pub endpoint: String, -} - -impl Region { - pub fn as_rusoto_region(&self) -> rusoto_signature::Region { - rusoto_signature::Region::Custom { - name: self.name.clone(), - endpoint: self.endpoint.clone(), - } - } -} - // ---- -impl Credentials { - pub fn k2v_client(&self) -> Result<K2vClient> { - self.storage.k2v_client() - } - pub fn s3_client(&self) -> Result<S3Client> { - self.storage.s3_client() - } - pub fn bucket(&self) -> &str { - self.storage.bucket.as_str() - } -} - -impl StorageCredentials { - pub fn k2v_client(&self) -> Result<K2vClient> { - let aws_creds = AwsCredentials::new( - self.aws_access_key_id.clone(), - self.aws_secret_access_key.clone(), - None, - None, - ); - - Ok(K2vClient::new( - self.k2v_region.as_rusoto_region(), - self.bucket.clone(), - aws_creds, - None, - )?) - } - - pub fn s3_client(&self) -> Result<S3Client> { - let aws_creds_provider = StaticProvider::new_minimal( - self.aws_access_key_id.clone(), - self.aws_secret_access_key.clone(), - ); - - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - let client = HttpClient::from_connector(connector); - - Ok(S3Client::new_with( - client, - aws_creds_provider, - self.s3_region.as_rusoto_region(), - )) - } -} - impl CryptoKeys { - pub async fn init( - storage: &StorageCredentials, - user_secrets: &UserSecrets, - password: &str, - ) -> Result<Self> { - // Check that salt and public don't exist already - let k2v = storage.k2v_client()?; - let (salt_ct, public_ct) = Self::check_uninitialized(&k2v).await?; - - // Generate salt for password identifiers - let mut ident_salt = [0u8; 32]; - thread_rng().fill(&mut ident_salt); - - // Generate (public, private) key pair and master key + /// Initialize a new cryptography root + pub fn init() -> Self { let (public, secret) = gen_keypair(); let master = gen_key(); - let keys = CryptoKeys { + CryptoKeys { master, secret, public, - }; - - // Generate short password digest (= password identity) - let ident = argon2_kdf(&ident_salt, password.as_bytes(), 16)?; - - // Generate salt for KDF - let mut kdf_salt = [0u8; 32]; - thread_rng().fill(&mut kdf_salt); - - // Calculate key for password secret box - let password_key = user_secrets.derive_password_key(&kdf_salt, password)?; - - // Seal a secret box that contains our crypto keys - let password_sealed = seal(&keys.serialize(), &password_key)?; - - let password_sortkey = format!("password:{}", hex::encode(&ident)); - let password_blob = [&kdf_salt[..], &password_sealed].concat(); - - // Write values to storage - k2v.insert_batch(&[ - k2v_insert_single_key("keys", "salt", salt_ct, ident_salt), - k2v_insert_single_key("keys", "public", public_ct, keys.public), - k2v_insert_single_key("keys", &password_sortkey, None, &password_blob), - ]) - .await - .context("InsertBatch for salt, public, and password")?; - - Ok(keys) - } - - pub async fn init_without_password( - storage: &StorageCredentials, - master: &Key, - secret: &SecretKey, - ) -> Result<Self> { - // Check that salt and public don't exist already - let k2v = storage.k2v_client()?; - let (salt_ct, public_ct) = Self::check_uninitialized(&k2v).await?; - - // Generate salt for password identifiers - let mut ident_salt = [0u8; 32]; - thread_rng().fill(&mut ident_salt); - - // Create CryptoKeys struct from given keys - let public = secret.public_key(); - let keys = CryptoKeys { - master: master.clone(), - secret: secret.clone(), - public, - }; - - // Write values to storage - k2v.insert_batch(&[ - k2v_insert_single_key("keys", "salt", salt_ct, ident_salt), - k2v_insert_single_key("keys", "public", public_ct, keys.public), - ]) - .await - .context("InsertBatch for salt and public")?; - - Ok(keys) - } - - pub async fn open( - storage: &StorageCredentials, - user_secrets: &UserSecrets, - password: &str, - ) -> Result<Self> { - let k2v = storage.k2v_client()?; - let (ident_salt, expected_public) = Self::load_salt_and_public(&k2v).await?; - - // Generate short password digest (= password identity) - let ident = argon2_kdf(&ident_salt, password.as_bytes(), 16)?; - - // Lookup password blob - let password_sortkey = format!("password:{}", hex::encode(&ident)); - - let password_blob = { - let mut val = match k2v.read_item("keys", &password_sortkey).await { - Err(k2v_client::Error::NotFound) => { - bail!("invalid password") - } - x => x?, - }; - if val.value.len() != 1 { - bail!("multiple values for password in storage"); - } - match val.value.pop().unwrap() { - K2vValue::Value(v) => v, - K2vValue::Tombstone => bail!("invalid password"), - } - }; - - // Try to open blob - let kdf_salt = &password_blob[..32]; - let password_openned = - user_secrets.try_open_encrypted_keys(kdf_salt, password, &password_blob[32..])?; - - let keys = Self::deserialize(&password_openned)?; - if keys.public != expected_public { - bail!("Password public key doesn't match stored public key"); - } - - Ok(keys) - } - - pub async fn open_without_password( - storage: &StorageCredentials, - master: &Key, - secret: &SecretKey, - ) -> Result<Self> { - let k2v = storage.k2v_client()?; - let (_ident_salt, expected_public) = Self::load_salt_and_public(&k2v).await?; - - // Create CryptoKeys struct from given keys - let public = secret.public_key(); - let keys = CryptoKeys { - master: master.clone(), - secret: secret.clone(), - public, - }; - - // Check public key matches - if keys.public != expected_public { - bail!("Given public key doesn't match stored public key"); } - - Ok(keys) - } - - pub async fn add_password( - &self, - storage: &StorageCredentials, - user_secrets: &UserSecrets, - password: &str, - ) -> Result<()> { - let k2v = storage.k2v_client()?; - let (ident_salt, _public) = Self::load_salt_and_public(&k2v).await?; - - // Generate short password digest (= password identity) - let ident = argon2_kdf(&ident_salt, password.as_bytes(), 16)?; - - // Generate salt for KDF - let mut kdf_salt = [0u8; 32]; - thread_rng().fill(&mut kdf_salt); - - // Calculate key for password secret box - let password_key = user_secrets.derive_password_key(&kdf_salt, password)?; - - // Seal a secret box that contains our crypto keys - let password_sealed = seal(&self.serialize(), &password_key)?; - - let password_sortkey = format!("password:{}", hex::encode(&ident)); - let password_blob = [&kdf_salt[..], &password_sealed].concat(); - - // List existing passwords to overwrite existing entry if necessary - let ct = match k2v.read_item("keys", &password_sortkey).await { - Err(k2v_client::Error::NotFound) => None, - v => { - let entry = v?; - if entry.value.iter().any(|x| matches!(x, K2vValue::Value(_))) { - bail!("password already exists"); - } - Some(entry.causality) - } - }; - - // Write values to storage - k2v.insert_batch(&[k2v_insert_single_key( - "keys", - &password_sortkey, - ct, - &password_blob, - )]) - .await - .context("InsertBatch for new password")?; - - Ok(()) - } - - pub async fn delete_password( - storage: &StorageCredentials, - password: &str, - allow_delete_all: bool, - ) -> Result<()> { - let k2v = storage.k2v_client()?; - let (ident_salt, _public) = Self::load_salt_and_public(&k2v).await?; - - // Generate short password digest (= password identity) - let ident = argon2_kdf(&ident_salt, password.as_bytes(), 16)?; - let password_sortkey = format!("password:{}", hex::encode(&ident)); - - // List existing passwords - let existing_passwords = Self::list_existing_passwords(&k2v).await?; - - // Check password is there - let pw = existing_passwords - .get(&password_sortkey) - .ok_or(anyhow!("password does not exist"))?; - - if !allow_delete_all && existing_passwords.len() < 2 { - bail!("No other password exists, not deleting last password."); - } - - k2v.delete_item("keys", &password_sortkey, pw.causality.clone()) - .await - .context("DeleteItem for password")?; - - Ok(()) - } - - // ---- STORAGE UTIL ---- - - async fn check_uninitialized( - k2v: &K2vClient, - ) -> Result<(Option<CausalityToken>, Option<CausalityToken>)> { - let params = k2v - .read_batch(&[ - k2v_read_single_key("keys", "salt", true), - k2v_read_single_key("keys", "public", true), - ]) - .await - .context("ReadBatch for salt and public in check_uninitialized")?; - if params.len() != 2 { - bail!( - "Invalid response from k2v storage: {:?} (expected two items)", - params - ); - } - if params[0].items.len() > 1 || params[1].items.len() > 1 { - bail!( - "invalid response from k2v storage: {:?} (several items in single_item read)", - params - ); - } - - let salt_ct = match params[0].items.iter().next() { - None => None, - Some((_, CausalValue { causality, value })) => { - if value.iter().any(|x| matches!(x, K2vValue::Value(_))) { - bail!("key storage already initialized"); - } - Some(causality.clone()) - } - }; - - let public_ct = match params[1].items.iter().next() { - None => None, - Some((_, CausalValue { causality, value })) => { - if value.iter().any(|x| matches!(x, K2vValue::Value(_))) { - bail!("key storage already initialized"); - } - Some(causality.clone()) - } - }; - - Ok((salt_ct, public_ct)) - } - - pub async fn load_salt_and_public(k2v: &K2vClient) -> Result<([u8; 32], PublicKey)> { - let mut params = k2v - .read_batch(&[ - k2v_read_single_key("keys", "salt", false), - k2v_read_single_key("keys", "public", false), - ]) - .await - .context("ReadBatch for salt and public in load_salt_and_public")?; - if params.len() != 2 { - bail!( - "Invalid response from k2v storage: {:?} (expected two items)", - params - ); - } - if params[0].items.len() != 1 || params[1].items.len() != 1 { - bail!("cryptographic keys not initialized for user"); - } - - // Retrieve salt from given response - let salt_vals = &mut params[0].items.iter_mut().next().unwrap().1.value; - if salt_vals.len() != 1 { - bail!("Multiple values for `salt`"); - } - let salt: Vec<u8> = match &mut salt_vals[0] { - K2vValue::Value(v) => std::mem::take(v), - K2vValue::Tombstone => bail!("salt is a tombstone"), - }; - if salt.len() != 32 { - bail!("`salt` is not 32 bytes long"); - } - let mut salt_constlen = [0u8; 32]; - salt_constlen.copy_from_slice(&salt); - - // Retrieve public from given response - let public_vals = &mut params[1].items.iter_mut().next().unwrap().1.value; - if public_vals.len() != 1 { - bail!("Multiple values for `public`"); - } - let public: Vec<u8> = match &mut public_vals[0] { - K2vValue::Value(v) => std::mem::take(v), - K2vValue::Tombstone => bail!("public is a tombstone"), - }; - let public = PublicKey::from_slice(&public).ok_or(anyhow!("Invalid public key length"))?; - - Ok((salt_constlen, public)) - } - - async fn list_existing_passwords(k2v: &K2vClient) -> Result<BTreeMap<String, CausalValue>> { - let mut res = k2v - .read_batch(&[BatchReadOp { - partition_key: "keys", - filter: Filter { - start: None, - end: None, - prefix: Some("password:"), - limit: None, - reverse: false, - }, - conflicts_only: false, - tombstones: false, - single_item: false, - }]) - .await - .context("ReadBatch for prefix password: in list_existing_passwords")?; - if res.len() != 1 { - bail!("unexpected k2v result: {:?}, expected one item", res); - } - Ok(res.pop().unwrap().items) } + // Clear text serialize/deserialize + /// Serialize the root as bytes without encryption fn serialize(&self) -> [u8; 64] { let mut res = [0u8; 64]; res[..32].copy_from_slice(self.master.as_ref()); @@ -513,6 +165,7 @@ impl CryptoKeys { res } + /// Deserialize a clear text crypto root without encryption fn deserialize(bytes: &[u8]) -> Result<Self> { if bytes.len() != 64 { bail!("Invalid length: {}, expected 64", bytes.len()); @@ -526,91 +179,66 @@ impl CryptoKeys { public, }) } -} -impl UserSecrets { - fn derive_password_key_with(user_secret: &str, kdf_salt: &[u8], password: &str) -> Result<Key> { - let tmp = format!("{}\n\n{}", user_secret, password); - Ok(Key::from_slice(&argon2_kdf(kdf_salt, tmp.as_bytes(), 32)?).unwrap()) - } + // Password sealed keys serialize/deserialize + pub fn password_open(password: &str, blob: &[u8]) -> Result<Self> { + let _pubkey = &blob[0..32]; + let kdf_salt = &blob[32..64]; + let password_openned = try_open_encrypted_keys(kdf_salt, password, &blob[64..])?; - fn derive_password_key(&self, kdf_salt: &[u8], password: &str) -> Result<Key> { - Self::derive_password_key_with(&self.user_secret, kdf_salt, password) + let keys = Self::deserialize(&password_openned)?; + Ok(keys) } - fn try_open_encrypted_keys( - &self, - kdf_salt: &[u8], - password: &str, - encrypted_keys: &[u8], - ) -> Result<Vec<u8>> { - let secrets_to_try = - std::iter::once(&self.user_secret).chain(self.alternate_user_secrets.iter()); - for user_secret in secrets_to_try { - let password_key = Self::derive_password_key_with(user_secret, kdf_salt, password)?; - if let Ok(res) = open(encrypted_keys, &password_key) { - return Ok(res); - } - } - bail!("Unable to decrypt password blob."); + pub fn password_seal(&self, password: &str) -> Result<Vec<u8>> { + let mut kdf_salt = [0u8; 32]; + thread_rng().fill(&mut kdf_salt); + + // Calculate key for password secret box + let password_key = derive_password_key(&kdf_salt, password)?; + + // Seal a secret box that contains our crypto keys + let password_sealed = seal(&self.serialize(), &password_key)?; + + // Create blob + let password_blob = [&self.public[..], &kdf_salt[..], &password_sealed].concat(); + + Ok(password_blob) } } +fn derive_password_key(kdf_salt: &[u8], password: &str) -> Result<Key> { + Ok(Key::from_slice(&argon2_kdf(kdf_salt, password.as_bytes(), 32)?).unwrap()) +} + +fn try_open_encrypted_keys( + kdf_salt: &[u8], + password: &str, + encrypted_keys: &[u8], +) -> Result<Vec<u8>> { + let password_key = derive_password_key(kdf_salt, password)?; + open(encrypted_keys, &password_key) +} + // ---- UTIL ---- pub fn argon2_kdf(salt: &[u8], password: &[u8], output_len: usize) -> Result<Vec<u8>> { - use argon2::{Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version}; + use argon2::{password_hash, Algorithm, Argon2, ParamsBuilder, PasswordHasher, Version}; - let mut params = ParamsBuilder::new(); - params + let params = ParamsBuilder::new() .output_len(output_len) - .map_err(|e| anyhow!("Invalid output length: {}", e))?; - - let params = params - .params() + .build() .map_err(|e| anyhow!("Invalid argon2 params: {}", e))?; let argon2 = Argon2::new(Algorithm::default(), Version::default(), params); - let salt = base64::encode_config(salt, base64::STANDARD_NO_PAD); + let b64_salt = base64::engine::general_purpose::STANDARD_NO_PAD.encode(salt); + let valid_salt = password_hash::Salt::from_b64(&b64_salt) + .map_err(|e| anyhow!("Invalid salt, error {}", e))?; let hash = argon2 - .hash_password(password, &salt) + .hash_password(password, valid_salt) .map_err(|e| anyhow!("Unable to hash: {}", e))?; let hash = hash.hash.ok_or(anyhow!("Missing output"))?; assert!(hash.len() == output_len); Ok(hash.as_bytes().to_vec()) } - -pub fn k2v_read_single_key<'a>( - partition_key: &'a str, - sort_key: &'a str, - tombstones: bool, -) -> BatchReadOp<'a> { - BatchReadOp { - partition_key, - filter: Filter { - start: Some(sort_key), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - conflicts_only: false, - tombstones, - single_item: true, - } -} - -pub fn k2v_insert_single_key<'a>( - partition_key: &'a str, - sort_key: &'a str, - causality: Option<CausalityToken>, - value: impl AsRef<[u8]>, -) -> BatchInsertOp<'a> { - BatchInsertOp { - partition_key, - sort_key, - causality, - value: K2vValue::Value(value.as_ref().to_vec()), - } -} diff --git a/src/login/static_provider.rs b/src/login/static_provider.rs index b9be5a6..1e1ecbf 100644 --- a/src/login/static_provider.rs +++ b/src/login/static_provider.rs @@ -1,45 +1,89 @@ use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::watch; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use crate::config::*; -use crate::cryptoblob::{Key, SecretKey}; use crate::login::*; +use crate::storage; -pub struct StaticLoginProvider { - default_bucket: Option<String>, - users: HashMap<String, Arc<LoginStaticUser>>, - users_by_email: HashMap<String, Arc<LoginStaticUser>>, +pub struct ContextualUserEntry { + pub username: String, + pub config: UserEntry, +} - k2v_region: Region, - s3_region: Region, +#[derive(Default)] +pub struct UserDatabase { + users: HashMap<String, Arc<ContextualUserEntry>>, + users_by_email: HashMap<String, Arc<ContextualUserEntry>>, } -impl StaticLoginProvider { - pub fn new(config: LoginStaticConfig, k2v_region: Region, s3_region: Region) -> Result<Self> { - let users = config - .users +pub struct StaticLoginProvider { + user_db: watch::Receiver<UserDatabase>, + in_memory_store: storage::in_memory::MemDb, +} + +pub async fn update_user_list(config: PathBuf, up: watch::Sender<UserDatabase>) -> Result<()> { + let mut stream = signal(SignalKind::user_defined1()) + .expect("failed to install SIGUSR1 signal hander for reload"); + + loop { + let ulist: UserList = match read_config(config.clone()) { + Ok(x) => x, + Err(e) => { + tracing::warn!(path=%config.as_path().to_string_lossy(), error=%e, "Unable to load config"); + stream.recv().await; + continue; + } + }; + + let users = ulist .into_iter() - .map(|(k, v)| (k, Arc::new(v))) + .map(|(username, config)| { + ( + username.clone(), + Arc::new(ContextualUserEntry { username, config }), + ) + }) .collect::<HashMap<_, _>>(); + let mut users_by_email = HashMap::new(); for (_, u) in users.iter() { - for m in u.email_addresses.iter() { + for m in u.config.email_addresses.iter() { if users_by_email.contains_key(m) { - bail!("Several users have same email address: {}", m); + tracing::warn!("Several users have the same email address: {}", m); + stream.recv().await; + continue; } users_by_email.insert(m.clone(), u.clone()); } } - Ok(Self { - default_bucket: config.default_bucket, + tracing::info!("{} users loaded", users.len()); + up.send(UserDatabase { users, users_by_email, - k2v_region, - s3_region, + }) + .context("update user db config")?; + stream.recv().await; + tracing::info!("Received SIGUSR1, reloading"); + } +} + +impl StaticLoginProvider { + pub async fn new(config: LoginStaticConfig) -> Result<Self> { + let (tx, mut rx) = watch::channel(UserDatabase::default()); + + tokio::spawn(update_user_list(config.user_list, tx)); + rx.changed().await?; + + Ok(Self { + user_db: rx, + in_memory_store: storage::in_memory::MemDb::new(), }) } } @@ -48,82 +92,67 @@ impl StaticLoginProvider { impl LoginProvider for StaticLoginProvider { async fn login(&self, username: &str, password: &str) -> Result<Credentials> { tracing::debug!(user=%username, "login"); - let user = match self.users.get(username) { - None => bail!("User {} does not exist", username), - Some(u) => u, + let user = { + let user_db = self.user_db.borrow(); + match user_db.users.get(username) { + None => bail!("User {} does not exist", username), + Some(u) => u.clone(), + } }; tracing::debug!(user=%username, "verify password"); - if !verify_password(password, &user.password)? { + if !verify_password(password, &user.config.password)? { bail!("Wrong password"); } - tracing::debug!(user=%username, "fetch bucket"); - let bucket = user - .bucket - .clone() - .or_else(|| self.default_bucket.clone()) - .ok_or(anyhow!( - "No bucket configured and no default bucket specieid" - ))?; - tracing::debug!(user=%username, "fetch keys"); - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id: user.aws_access_key_id.clone(), - aws_secret_access_key: user.aws_secret_access_key.clone(), - bucket, - }; - - let keys = match (&user.master_key, &user.secret_key) { - (Some(m), Some(s)) => { - let master_key = - Key::from_slice(&base64::decode(m)?).ok_or(anyhow!("Invalid master key"))?; - let secret_key = SecretKey::from_slice(&base64::decode(s)?) - .ok_or(anyhow!("Invalid secret key"))?; - CryptoKeys::open_without_password(&storage, &master_key, &secret_key).await? + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => self.in_memory_store.builder(username).await, + StaticStorage::Garage(grgconf) => { + storage::garage::GarageBuilder::new(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? } - (None, None) => { - let user_secrets = UserSecrets { - user_secret: user.user_secret.clone(), - alternate_user_secrets: user.alternate_user_secrets.clone(), - }; - CryptoKeys::open(&storage, &user_secrets, password).await? - } - _ => bail!( - "Either both master and secret key or none of them must be specified for user" - ), }; + let cr = CryptoRoot(user.config.crypto_root.clone()); + let keys = cr.crypto_keys(password)?; + tracing::debug!(user=%username, "logged"); Ok(Credentials { storage, keys }) } async fn public_login(&self, email: &str) -> Result<PublicCredentials> { - let user = match self.users_by_email.get(email) { - None => bail!("No user for email address {}", email), - Some(u) => u, + let user = { + let user_db = self.user_db.borrow(); + match user_db.users_by_email.get(email) { + None => bail!("Email {} does not exist", email), + Some(u) => u.clone(), + } }; - - let bucket = user - .bucket - .clone() - .or_else(|| self.default_bucket.clone()) - .ok_or(anyhow!( - "No bucket configured and no default bucket specieid" - ))?; - - let storage = StorageCredentials { - k2v_region: self.k2v_region.clone(), - s3_region: self.s3_region.clone(), - aws_access_key_id: user.aws_access_key_id.clone(), - aws_secret_access_key: user.aws_secret_access_key.clone(), - bucket, + tracing::debug!(user=%user.username, "public_login"); + + let storage: storage::Builder = match &user.config.storage { + StaticStorage::InMemory => self.in_memory_store.builder(&user.username).await, + StaticStorage::Garage(grgconf) => { + storage::garage::GarageBuilder::new(storage::garage::GarageConf { + region: grgconf.aws_region.clone(), + k2v_endpoint: grgconf.k2v_endpoint.clone(), + s3_endpoint: grgconf.s3_endpoint.clone(), + aws_access_key_id: grgconf.aws_access_key_id.clone(), + aws_secret_access_key: grgconf.aws_secret_access_key.clone(), + bucket: grgconf.bucket.clone(), + })? + } }; - let k2v_client = storage.k2v_client()?; - let (_, public_key) = CryptoKeys::load_salt_and_public(&k2v_client).await?; + let cr = CryptoRoot(user.config.crypto_root.clone()); + let public_key = cr.public_key()?; Ok(PublicCredentials { storage, diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index b7d2f48..04d2ef1 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -1,28 +1,25 @@ -use std::collections::HashMap; +//use std::collections::HashMap; use std::convert::TryFrom; use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use base64::Engine; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; -use rusoto_s3::{ - DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, -}; -use tokio::io::AsyncReadExt; +//use tokio::io::AsyncReadExt; use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; -use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::*; use crate::mail::user::User; use crate::mail::IMF; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::now_msec; const INCOMING_PK: &str = "incoming"; const INCOMING_LOCK_SK: &str = "lock"; @@ -54,24 +51,23 @@ async fn incoming_mail_watch_process_internal( creds: Credentials, mut rx_inbox_id: watch::Receiver<Option<(UniqueIdent, ImapUidvalidity)>>, ) -> Result<()> { - let mut lock_held = k2v_lock_loop(creds.k2v_client()?, INCOMING_PK, INCOMING_LOCK_SK); - - let k2v = creds.k2v_client()?; - let s3 = creds.s3_client()?; + let mut lock_held = k2v_lock_loop( + creds.storage.build().await?, + storage::RowRef::new(INCOMING_PK, INCOMING_LOCK_SK), + ); + let storage = creds.storage.build().await?; let mut inbox: Option<Arc<Mailbox>> = None; - let mut prev_ct: Option<CausalityToken> = None; + let mut incoming_key = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); loop { - let new_mail = if *lock_held.borrow() { + let maybe_updated_incoming_key = if *lock_held.borrow() { info!("incoming lock held"); let wait_new_mail = async { loop { - match k2v_wait_value_changed(&k2v, INCOMING_PK, INCOMING_WATCH_SK, &prev_ct) - .await - { - Ok(cv) => break cv, + match storage.row_poll(&incoming_key).await { + Ok(row_val) => break row_val.row_ref, Err(e) => { error!("Error in wait_new_mail: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; @@ -81,10 +77,10 @@ async fn incoming_mail_watch_process_internal( }; tokio::select! { - cv = wait_new_mail => Some(cv.causality), - _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => prev_ct.clone(), - _ = lock_held.changed() => None, - _ = rx_inbox_id.changed() => None, + inc_k = wait_new_mail => Some(inc_k), + _ = tokio::time::sleep(MAIL_CHECK_INTERVAL) => Some(incoming_key.clone()), + _ = lock_held.changed() => None, + _ = rx_inbox_id.changed() => None, } } else { info!("incoming lock not held"); @@ -123,10 +119,10 @@ async fn incoming_mail_watch_process_internal( // If we were able to open INBOX, and we have mail, // fetch new mail - if let (Some(inbox), Some(new_ct)) = (&inbox, new_mail) { - match handle_incoming_mail(&user, &s3, inbox, &lock_held).await { + if let (Some(inbox), Some(updated_incoming_key)) = (&inbox, maybe_updated_incoming_key) { + match handle_incoming_mail(&user, &storage, inbox, &lock_held).await { Ok(()) => { - prev_ct = Some(new_ct); + incoming_key = updated_incoming_key; } Err(e) => { error!("Could not fetch incoming mail: {}", e); @@ -141,27 +137,20 @@ async fn incoming_mail_watch_process_internal( async fn handle_incoming_mail( user: &Arc<User>, - s3: &S3Client, + storage: &storage::Store, inbox: &Arc<Mailbox>, lock_held: &watch::Receiver<bool>, ) -> Result<()> { - let lor = ListObjectsV2Request { - bucket: user.creds.storage.bucket.clone(), - max_keys: Some(1000), - prefix: Some("incoming/".into()), - ..Default::default() - }; - let mails_res = s3.list_objects_v2(lor).await?; + let mails_res = storage.blob_list("incoming/").await?; - for object in mails_res.contents.unwrap_or_default() { + for object in mails_res { if !*lock_held.borrow() { break; } - if let Some(key) = object.key { - if let Some(mail_id) = key.strip_prefix("incoming/") { - if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { - move_incoming_message(user, s3, inbox, mail_id).await?; - } + let key = object.0; + if let Some(mail_id) = key.strip_prefix("incoming/") { + if let Ok(mail_id) = mail_id.parse::<UniqueIdent>() { + move_incoming_message(user, storage, inbox, mail_id).await?; } } } @@ -171,7 +160,7 @@ async fn handle_incoming_mail( async fn move_incoming_message( user: &Arc<User>, - s3: &S3Client, + storage: &storage::Store, inbox: &Arc<Mailbox>, id: UniqueIdent, ) -> Result<()> { @@ -180,22 +169,15 @@ async fn move_incoming_message( let object_key = format!("incoming/{}", id); // 1. Fetch message from S3 - let gor = GetObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - let get_result = s3.get_object(gor).await?; + let object = storage.blob_fetch(&storage::BlobRef(object_key)).await?; // 1.a decrypt message key from headers - info!("Object metadata: {:?}", get_result.metadata); - let key_encrypted_b64 = get_result - .metadata - .as_ref() - .ok_or(anyhow!("Missing key in metadata"))? + //info!("Object metadata: {:?}", get_result.metadata); + let key_encrypted_b64 = object + .meta .get(MESSAGE_KEY) .ok_or(anyhow!("Missing key in metadata"))?; - let key_encrypted = base64::decode(key_encrypted_b64)?; + let key_encrypted = base64::engine::general_purpose::STANDARD.decode(key_encrypted_b64)?; let message_key = sodiumoxide::crypto::sealedbox::open( &key_encrypted, &user.creds.keys.public, @@ -206,38 +188,28 @@ async fn move_incoming_message( cryptoblob::Key::from_slice(&message_key).ok_or(anyhow!("Invalid message key"))?; // 1.b retrieve message body - let obj_body = get_result.body.ok_or(anyhow!("Missing object body"))?; - let mut mail_buf = Vec::with_capacity(get_result.content_length.unwrap_or(128) as usize); - obj_body - .into_async_read() - .read_to_end(&mut mail_buf) - .await?; - let plain_mail = cryptoblob::open(&mail_buf, &message_key) + let obj_body = object.value; + let plain_mail = cryptoblob::open(&obj_body, &message_key) .map_err(|_| anyhow!("Cannot decrypt email content"))?; // 2 parse mail and add to inbox let msg = IMF::try_from(&plain_mail[..]).map_err(|_| anyhow!("Invalid email body"))?; inbox - .append_from_s3(msg, id, &object_key, message_key) + .append_from_s3(msg, id, object.blob_ref.clone(), message_key) .await?; // 3 delete from incoming - let dor = DeleteObjectRequest { - bucket: user.creds.storage.bucket.clone(), - key: object_key.clone(), - ..Default::default() - }; - s3.delete_object(dor).await?; + storage.blob_rm(&object.blob_ref).await?; Ok(()) } // ---- UTIL: K2V locking loop, use this to try to grab a lock using a K2V entry as a signal ---- -fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::Receiver<bool> { +fn k2v_lock_loop(storage: storage::Store, row_ref: storage::RowRef) -> watch::Receiver<bool> { let (held_tx, held_rx) = watch::channel(false); - tokio::spawn(k2v_lock_loop_internal(k2v, pk, sk, held_tx)); + tokio::spawn(k2v_lock_loop_internal(storage, row_ref, held_tx)); held_rx } @@ -246,13 +218,12 @@ fn k2v_lock_loop(k2v: K2vClient, pk: &'static str, sk: &'static str) -> watch::R enum LockState { Unknown, Empty, - Held(UniqueIdent, u64, CausalityToken), + Held(UniqueIdent, u64, storage::RowRef), } async fn k2v_lock_loop_internal( - k2v: K2vClient, - pk: &'static str, - sk: &'static str, + storage: storage::Store, + row_ref: storage::RowRef, held_tx: watch::Sender<bool>, ) { let (state_tx, mut state_rx) = watch::channel::<LockState>(LockState::Unknown); @@ -262,10 +233,10 @@ async fn k2v_lock_loop_internal( // Loop 1: watch state of lock in K2V, save that in corresponding watch channel let watch_lock_loop: BoxFuture<Result<()>> = async { - let mut ct = None; + let mut ct = row_ref.clone(); loop { info!("k2v watch lock loop iter: ct = {:?}", ct); - match k2v_wait_value_changed(&k2v, pk, sk, &ct).await { + match storage.row_poll(&ct).await { Err(e) => { error!( "Error in k2v wait value changed: {} ; assuming we no longer hold lock.", @@ -277,7 +248,7 @@ async fn k2v_lock_loop_internal( Ok(cv) => { let mut lock_state = None; for v in cv.value.iter() { - if let K2vValue::Value(vbytes) = v { + if let storage::Alternative::Value(vbytes) = v { if vbytes.len() == 32 { let ts = u64::from_be_bytes(vbytes[..8].try_into().unwrap()); let pid = UniqueIdent(vbytes[8..].try_into().unwrap()); @@ -290,16 +261,18 @@ async fn k2v_lock_loop_internal( } } } + let new_ct = cv.row_ref; + info!( "k2v watch lock loop: changed, old ct = {:?}, new ct = {:?}, v = {:?}", - ct, cv.causality, lock_state + ct, new_ct, lock_state ); state_tx.send( lock_state - .map(|(pid, ts)| LockState::Held(pid, ts, cv.causality.clone())) + .map(|(pid, ts)| LockState::Held(pid, ts, new_ct.clone())) .unwrap_or(LockState::Empty), )?; - ct = Some(cv.causality); + ct = new_ct; } } } @@ -385,7 +358,14 @@ async fn k2v_lock_loop_internal( now_msec() + LOCK_DURATION.as_millis() as u64, )); lock[8..].copy_from_slice(&our_pid.0); - if let Err(e) = k2v.insert_item(pk, sk, lock, ct).await { + let row = match ct { + Some(existing) => existing, + None => row_ref.clone(), + }; + if let Err(e) = storage + .row_insert(vec![storage::RowVal::new(row, lock)]) + .await + { error!("Could not take lock: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } @@ -401,7 +381,7 @@ async fn k2v_lock_loop_internal( info!("lock loop exited, releasing"); if !held_tx.is_closed() { - warn!("wierd..."); + warn!("weird..."); let _ = held_tx.send(false); } @@ -411,7 +391,10 @@ async fn k2v_lock_loop_internal( _ => None, }; if let Some(ct) = release { - let _ = k2v.delete_item(pk, sk, ct.clone()).await; + match storage.row_rm(&storage::Selector::Single(&ct)).await { + Err(e) => warn!("Unable to release lock {:?}: {}", ct, e), + Ok(_) => (), + }; } } @@ -433,43 +416,30 @@ impl EncryptedMessage { } pub async fn deliver_to(self: Arc<Self>, creds: PublicCredentials) -> Result<()> { - let s3_client = creds.storage.s3_client()?; - let k2v_client = creds.storage.k2v_client()?; + let storage = creds.storage.build().await?; // Get causality token of previous watch key - let watch_ct = match k2v_client.read_item(INCOMING_PK, INCOMING_WATCH_SK).await { - Err(_) => None, - Ok(cv) => Some(cv.causality), + let query = storage::RowRef::new(INCOMING_PK, INCOMING_WATCH_SK); + let watch_ct = match storage.row_fetch(&storage::Selector::Single(&query)).await { + Err(_) => query, + Ok(cv) => cv.into_iter().next().map(|v| v.row_ref).unwrap_or(query), }; // Write mail to encrypted storage let encrypted_key = sodiumoxide::crypto::sealedbox::seal(self.key.as_ref(), &creds.public_key); - let key_header = base64::encode(&encrypted_key); - - let por = PutObjectRequest { - bucket: creds.storage.bucket.clone(), - key: format!("incoming/{}", gen_ident()), - metadata: Some( - [(MESSAGE_KEY.to_string(), key_header)] - .into_iter() - .collect::<HashMap<_, _>>(), - ), - body: Some(self.encrypted_body.clone().into()), - ..Default::default() - }; - s3_client.put_object(por).await?; + let key_header = base64::engine::general_purpose::STANDARD.encode(&encrypted_key); - // Update watch key to signal new mail - k2v_client - .insert_item( - INCOMING_PK, - INCOMING_WATCH_SK, - gen_ident().0.to_vec(), - watch_ct, - ) - .await?; + let blob_val = storage::BlobVal::new( + storage::BlobRef(format!("incoming/{}", gen_ident())), + self.encrypted_body.clone().into(), + ) + .with_meta(MESSAGE_KEY.to_string(), key_header); + storage.blob_insert(blob_val).await?; + // Update watch key to signal new mail + let watch_val = storage::RowVal::new(watch_ct.clone(), gen_ident().0.to_vec()); + storage.row_insert(vec![watch_val]).await?; Ok(()) } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index d92140d..e424ba3 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -1,11 +1,5 @@ use anyhow::{anyhow, bail, Result}; -use k2v_client::K2vClient; -use k2v_client::{BatchReadOp, Filter, K2vValue}; -use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3, -}; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncReadExt; use tokio::sync::RwLock; use crate::bayou::Bayou; @@ -14,7 +8,8 @@ use crate::login::Credentials; use crate::mail::uidindex::*; use crate::mail::unique_ident::*; use crate::mail::IMF; -use crate::time::now_msec; +use crate::storage::{self, BlobRef, BlobVal, RowRef, RowVal, Selector, Store}; +use crate::timestamp::now_msec; pub struct Mailbox { pub(super) id: UniqueIdent, @@ -30,7 +25,7 @@ impl Mailbox { let index_path = format!("index/{}", id); let mail_path = format!("mail/{}", id); - let mut uid_index = Bayou::<UidIndex>::new(creds, index_path)?; + let mut uid_index = Bayou::<UidIndex>::new(creds, index_path).await?; uid_index.sync().await?; let uidvalidity = uid_index.state().uidvalidity; @@ -48,10 +43,8 @@ impl Mailbox { let mbox = RwLock::new(MailboxInternal { id, - bucket: creds.bucket().to_string(), encryption_key: creds.keys.master.clone(), - k2v: creds.k2v_client()?, - s3: creds.s3_client()?, + storage: creds.storage.build().await?, uid_index, mail_path, }); @@ -121,13 +114,13 @@ impl Mailbox { &self, msg: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_ref: storage::BlobRef, message_key: Key, ) -> Result<()> { self.mbox .write() .await - .append_from_s3(msg, ident, s3_key, message_key) + .append_from_s3(msg, ident, blob_ref, message_key) .await } @@ -182,13 +175,9 @@ struct MailboxInternal { // 2023-05-15 will probably be used later. #[allow(dead_code)] id: UniqueIdent, - bucket: String, mail_path: String, encryption_key: Key, - - k2v: K2vClient, - s3: S3Client, - + storage: Store, uid_index: Bayou<UidIndex>, } @@ -209,33 +198,19 @@ impl MailboxInternal { let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let ops = ids .iter() - .map(|id| BatchReadOp { - partition_key: &self.mail_path, - filter: Filter { - start: Some(id), - end: None, - prefix: None, - limit: None, - reverse: false, - }, - single_item: true, - conflicts_only: false, - tombstones: false, - }) + .map(|id| RowRef::new(self.mail_path.as_str(), id.as_str())) .collect::<Vec<_>>(); - let res_vec = self.k2v.read_batch(&ops).await?; + let res_vec = self.storage.row_fetch(&Selector::List(ops)).await?; let mut meta_vec = vec![]; - for (op, res) in ops.iter().zip(res_vec.into_iter()) { - if res.items.len() != 1 { - bail!("Expected 1 item, got {}", res.items.len()); - } - let (_, cv) = res.items.iter().next().unwrap(); + for res in res_vec.into_iter() { let mut meta_opt = None; - for v in cv.value.iter() { + + // Resolve conflicts + for v in res.value.iter() { match v { - K2vValue::Tombstone => (), - K2vValue::Value(v) => { + storage::Alternative::Tombstone => (), + storage::Alternative::Value(v) => { let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?; match meta_opt.as_mut() { None => { @@ -251,7 +226,7 @@ impl MailboxInternal { if let Some(meta) = meta_opt { meta_vec.push(meta); } else { - bail!("No valid meta value in k2v for {:?}", op.filter.start); + bail!("No valid meta value in k2v for {:?}", res.row_ref); } } @@ -259,19 +234,12 @@ impl MailboxInternal { } async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> { - let gor = GetObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, id), - ..Default::default() - }; - - let obj_res = self.s3.get_object(gor).await?; - - let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?; - let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize); - obj_body.into_async_read().read_to_end(&mut buf).await?; - - cryptoblob::open(&buf, message_key) + let obj_res = self + .storage + .blob_fetch(&BlobRef(format!("{}/{}", self.mail_path, id))) + .await?; + let body = obj_res.value; + cryptoblob::open(&body, message_key) } // ---- Functions for changing the mailbox ---- @@ -304,13 +272,12 @@ impl MailboxInternal { async { // Encrypt and save mail body let message_blob = cryptoblob::seal(mail.raw, &message_key)?; - let por = PutObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - body: Some(message_blob.into()), - ..Default::default() - }; - self.s3.put_object(por).await?; + self.storage + .blob_insert(BlobVal::new( + BlobRef(format!("{}/{}", self.mail_path, ident)), + message_blob, + )) + .await?; Ok::<_, anyhow::Error>(()) }, async { @@ -322,8 +289,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -349,20 +319,14 @@ impl MailboxInternal { &mut self, mail: IMF<'a>, ident: UniqueIdent, - s3_key: &str, + blob_src: storage::BlobRef, message_key: Key, ) -> Result<()> { futures::try_join!( async { // Copy mail body from previous location - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - copy_source: format!("{}/{}", self.bucket, s3_key), - metadata_directive: Some("REPLACE".into()), - ..Default::default() - }; - self.s3.copy_object(cor).await?; + let blob_dst = BlobRef(format!("{}/{}", self.mail_path, ident)); + self.storage.blob_copy(&blob_src, &blob_dst).await?; Ok::<_, anyhow::Error>(()) }, async { @@ -374,8 +338,11 @@ impl MailboxInternal { rfc822_size: mail.raw.len(), }; let meta_blob = seal_serialize(&meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &ident.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, @@ -400,21 +367,26 @@ impl MailboxInternal { futures::try_join!( async { // Delete mail body from S3 - let dor = DeleteObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, ident), - ..Default::default() - }; - self.s3.delete_object(dor).await?; + self.storage + .blob_rm(&BlobRef(format!("{}/{}", self.mail_path, ident))) + .await?; Ok::<_, anyhow::Error>(()) }, async { // Delete mail meta from K2V let sk = ident.to_string(); - let v = self.k2v.read_item(&self.mail_path, &sk).await?; - self.k2v - .delete_item(&self.mail_path, &sk, v.causality) + let res = self + .storage + .row_fetch(&storage::Selector::Single(&RowRef::new( + &self.mail_path, + &sk, + ))) .await?; + if let Some(row_val) = res.into_iter().next() { + self.storage + .row_rm(&storage::Selector::Single(&row_val.row_ref)) + .await?; + } Ok::<_, anyhow::Error>(()) } )?; @@ -445,7 +417,7 @@ impl MailboxInternal { source_id: UniqueIdent, new_id: UniqueIdent, ) -> Result<()> { - if self.bucket != from.bucket || self.encryption_key != from.encryption_key { + if self.encryption_key != from.encryption_key { bail!("Message to be copied/moved does not belong to same account."); } @@ -460,23 +432,20 @@ impl MailboxInternal { futures::try_join!( async { - // Copy mail body from S3 - let cor = CopyObjectRequest { - bucket: self.bucket.clone(), - key: format!("{}/{}", self.mail_path, new_id), - copy_source: format!("{}/{}/{}", from.bucket, from.mail_path, source_id), - ..Default::default() - }; - - self.s3.copy_object(cor).await?; + let dst = BlobRef(format!("{}/{}", self.mail_path, new_id)); + let src = BlobRef(format!("{}/{}", from.mail_path, source_id)); + self.storage.blob_copy(&src, &dst).await?; Ok::<_, anyhow::Error>(()) }, async { // Copy mail meta in K2V let meta = &from.fetch_meta(&[source_id]).await?[0]; let meta_blob = seal_serialize(meta, &self.encryption_key)?; - self.k2v - .insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None) + self.storage + .row_insert(vec![RowVal::new( + RowRef::new(&self.mail_path, &new_id.to_string()), + meta_blob, + )]) .await?; Ok::<_, anyhow::Error>(()) }, diff --git a/src/mail/unique_ident.rs b/src/mail/unique_ident.rs index 267f66e..0e629db 100644 --- a/src/mail/unique_ident.rs +++ b/src/mail/unique_ident.rs @@ -5,7 +5,7 @@ use lazy_static::lazy_static; use rand::prelude::*; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; -use crate::time::now_msec; +use crate::timestamp::now_msec; /// An internal Mail Identifier is composed of two components: /// - a process identifier, 128 bits, itself composed of: diff --git a/src/mail/user.rs b/src/mail/user.rs index 5523c2a..da0d509 100644 --- a/src/mail/user.rs +++ b/src/mail/user.rs @@ -2,18 +2,18 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::{Arc, Weak}; use anyhow::{anyhow, bail, Result}; -use k2v_client::{CausalityToken, K2vClient, K2vValue}; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use crate::cryptoblob::{open_deserialize, seal_serialize}; -use crate::login::{Credentials, StorageCredentials}; +use crate::login::Credentials; use crate::mail::incoming::incoming_mail_watch_process; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; use crate::mail::unique_ident::{gen_ident, UniqueIdent}; -use crate::time::now_msec; +use crate::storage; +use crate::timestamp::now_msec; pub const MAILBOX_HIERARCHY_DELIMITER: char = '.'; @@ -33,7 +33,7 @@ const MAILBOX_LIST_SK: &str = "list"; pub struct User { pub username: String, pub creds: Credentials, - pub k2v: K2vClient, + pub storage: storage::Store, pub mailboxes: std::sync::Mutex<HashMap<UniqueIdent, Weak<Mailbox>>>, tx_inbox_id: watch::Sender<Option<(UniqueIdent, ImapUidvalidity)>>, @@ -41,7 +41,7 @@ pub struct User { impl User { pub async fn new(username: String, creds: Credentials) -> Result<Arc<Self>> { - let cache_key = (username.clone(), creds.storage.clone()); + let cache_key = (username.clone(), creds.storage.unique()); { let cache = USER_CACHE.lock().unwrap(); @@ -165,6 +165,7 @@ impl User { list.rename_mailbox(name, &nnew)?; } } + self.save_mailbox_list(&list, ct).await?; } Ok(()) @@ -173,14 +174,14 @@ impl User { // ---- Internal user & mailbox management ---- async fn open(username: String, creds: Credentials) -> Result<Arc<Self>> { - let k2v = creds.k2v_client()?; + let storage = creds.storage.build().await?; let (tx_inbox_id, rx_inbox_id) = watch::channel(None); let user = Arc::new(Self { username, creds: creds.clone(), - k2v, + storage, tx_inbox_id, mailboxes: std::sync::Mutex::new(HashMap::new()), }); @@ -223,32 +224,42 @@ impl User { // ---- Mailbox list management ---- - async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<CausalityToken>)> { - let (mut list, ct) = match self.k2v.read_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK).await { - Err(k2v_client::Error::NotFound) => (MailboxList::new(), None), + async fn load_mailbox_list(&self) -> Result<(MailboxList, Option<storage::RowRef>)> { + let row_ref = storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK); + let (mut list, row) = match self + .storage + .row_fetch(&storage::Selector::Single(&row_ref)) + .await + { + Err(storage::StorageError::NotFound) => (MailboxList::new(), None), Err(e) => return Err(e.into()), - Ok(cv) => { + Ok(rv) => { let mut list = MailboxList::new(); - for v in cv.value { - if let K2vValue::Value(vbytes) = v { + let (row_ref, row_vals) = match rv.into_iter().next() { + Some(row_val) => (row_val.row_ref, row_val.value), + None => (row_ref, vec![]), + }; + + for v in row_vals { + if let storage::Alternative::Value(vbytes) = v { let list2 = open_deserialize::<MailboxList>(&vbytes, &self.creds.keys.master)?; list.merge(list2); } } - (list, Some(cv.causality)) + (list, Some(row_ref)) } }; - self.ensure_inbox_exists(&mut list, &ct).await?; + self.ensure_inbox_exists(&mut list, &row).await?; - Ok((list, ct)) + Ok((list, row)) } async fn ensure_inbox_exists( &self, list: &mut MailboxList, - ct: &Option<CausalityToken>, + ct: &Option<storage::RowRef>, ) -> Result<bool> { // If INBOX doesn't exist, create a new mailbox with that name // and save new mailbox list. @@ -277,12 +288,12 @@ impl User { async fn save_mailbox_list( &self, list: &MailboxList, - ct: Option<CausalityToken>, + ct: Option<storage::RowRef>, ) -> Result<()> { let list_blob = seal_serialize(list, &self.creds.keys.master)?; - self.k2v - .insert_item(MAILBOX_LIST_PK, MAILBOX_LIST_SK, list_blob, ct) - .await?; + let rref = ct.unwrap_or(storage::RowRef::new(MAILBOX_LIST_PK, MAILBOX_LIST_SK)); + let row_val = storage::RowVal::new(rref, list_blob); + self.storage.row_insert(vec![row_val]).await?; Ok(()) } } @@ -455,6 +466,6 @@ enum CreatedMailbox { // ---- User cache ---- lazy_static! { - static ref USER_CACHE: std::sync::Mutex<HashMap<(String, StorageCredentials), Weak<User>>> = + static ref USER_CACHE: std::sync::Mutex<HashMap<(String, storage::UnicityBuffer), Weak<User>>> = std::sync::Mutex::new(HashMap::new()); } diff --git a/src/main.rs b/src/main.rs index 4ca07d0..3221c2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![feature(async_fn_in_trait)] + mod bayou; mod config; mod cryptoblob; @@ -7,16 +9,17 @@ mod lmtp; mod login; mod mail; mod server; -mod time; +mod storage; +mod timestamp; +use std::io::Read; use std::path::PathBuf; -use anyhow::{bail, Result}; +use anyhow::{bail, Context, Result}; use clap::{Parser, Subcommand}; -use rand::prelude::*; +use nix::{sys::signal, unistd::Pid}; use config::*; -use cryptoblob::*; use login::{static_provider::*, *}; use server::Server; @@ -25,92 +28,118 @@ use server::Server; struct Args { #[clap(subcommand)] command: Command, + + #[clap(short, long, env = "CONFIG_FILE", default_value = "aerogramme.toml")] + config_file: PathBuf, } #[derive(Subcommand, Debug)] enum Command { - /// Runs the IMAP+LMTP server daemon - Server { - #[clap(short, long, env = "CONFIG_FILE", default_value = "aerogramme.toml")] - config_file: PathBuf, - }, - /// TEST TEST TEST - Test { - #[clap(short, long, env = "CONFIG_FILE", default_value = "aerogramme.toml")] - config_file: PathBuf, - }, - /// Initializes key pairs for a user and adds a key decryption password - FirstLogin { - #[clap(flatten)] - creds: StorageCredsArgs, - #[clap(flatten)] - user_secrets: UserSecretsArgs, + #[clap(subcommand)] + /// A daemon to be run by the end user, on a personal device + Companion(CompanionCommand), + + #[clap(subcommand)] + /// A daemon to be run by the service provider, on a server + Provider(ProviderCommand), + + #[clap(subcommand)] + /// Specific tooling, should not be part of a normal workflow, for debug & experimentation only + Tools(ToolsCommand), + //Test, +} + +#[derive(Subcommand, Debug)] +enum ToolsCommand { + /// Manage crypto roots + #[clap(subcommand)] + CryptoRoot(CryptoRootCommand), + + PasswordHash { + #[clap(env = "AEROGRAMME_PASSWORD")] + maybe_password: Option<String>, }, - /// Initializes key pairs for a user and dumps keys to stdout for usage with static - /// login provider - InitializeLocalKeys { - #[clap(flatten)] - creds: StorageCredsArgs, +} + +#[derive(Subcommand, Debug)] +enum CryptoRootCommand { + /// Generate a new crypto-root protected with a password + New { + #[clap(env = "AEROGRAMME_PASSWORD")] + maybe_password: Option<String>, }, - /// Adds a key decryption password for a user - AddPassword { - #[clap(flatten)] - creds: StorageCredsArgs, - #[clap(flatten)] - user_secrets: UserSecretsArgs, - /// Automatically generate password - #[clap(short, long)] - gen: bool, + /// Generate a new clear text crypto-root, store it securely! + NewClearText, + /// Change the password of a crypto key + ChangePassword { + #[clap(env = "AEROGRAMME_OLD_PASSWORD")] + maybe_old_password: Option<String>, + + #[clap(env = "AEROGRAMME_NEW_PASSWORD")] + maybe_new_password: Option<String>, + + #[clap(short, long, env = "AEROGRAMME_CRYPTO_ROOT")] + crypto_root: String, }, - /// Deletes a key decription password for a user - DeletePassword { - #[clap(flatten)] - creds: StorageCredsArgs, - #[clap(flatten)] - user_secrets: UserSecretsArgs, - /// Allow to delete all passwords - #[clap(long)] - allow_delete_all: bool, + /// From a given crypto-key, derive one containing only the public key + DeriveIncoming { + #[clap(short, long, env = "AEROGRAMME_CRYPTO_ROOT")] + crypto_root: String, }, - /// Dumps all encryption keys for user - ShowKeys { - #[clap(flatten)] - creds: StorageCredsArgs, - #[clap(flatten)] - user_secrets: UserSecretsArgs, +} + +#[derive(Subcommand, Debug)] +enum CompanionCommand { + /// Runs the IMAP proxy + Daemon, + Reload { + #[clap(short, long, env = "AEROGRAMME_PID")] + pid: Option<i32>, }, + Wizard, + #[clap(subcommand)] + Account(AccountManagement), } -#[derive(Parser, Debug)] -struct StorageCredsArgs { - /// Name of the region to use - #[clap(short = 'r', long, env = "AWS_REGION")] - region: String, - /// Url of the endpoint to connect to for K2V - #[clap(short = 'k', long, env = "K2V_ENDPOINT")] - k2v_endpoint: String, - /// Url of the endpoint to connect to for S3 - #[clap(short = 's', long, env = "S3_ENDPOINT")] - s3_endpoint: String, - /// Access key ID - #[clap(short = 'A', long, env = "AWS_ACCESS_KEY_ID")] - aws_access_key_id: String, - /// Access key ID - #[clap(short = 'S', long, env = "AWS_SECRET_ACCESS_KEY")] - aws_secret_access_key: String, - /// Bucket name - #[clap(short = 'b', long, env = "BUCKET")] - bucket: String, +#[derive(Subcommand, Debug)] +enum ProviderCommand { + /// Runs the IMAP+LMTP server daemon + Daemon, + /// Reload the daemon + Reload { + #[clap(short, long, env = "AEROGRAMME_PID")] + pid: Option<i32>, + }, + /// Manage static accounts + #[clap(subcommand)] + Account(AccountManagement), } -#[derive(Parser, Debug)] -struct UserSecretsArgs { - /// User secret - #[clap(short = 'U', long, env = "USER_SECRET")] - user_secret: String, - /// Alternate user secrets (comma-separated list of strings) - #[clap(long, env = "ALTERNATE_USER_SECRETS", default_value = "")] - alternate_user_secrets: String, +#[derive(Subcommand, Debug)] +enum AccountManagement { + /// Add an account + Add { + #[clap(short, long)] + login: String, + #[clap(short, long)] + setup: PathBuf, + }, + /// Delete an account + Delete { + #[clap(short, long)] + login: String, + }, + /// Change password for a given account + ChangePassword { + #[clap(env = "AEROGRAMME_OLD_PASSWORD")] + maybe_old_password: Option<String>, + + #[clap(env = "AEROGRAMME_NEW_PASSWORD")] + maybe_new_password: Option<String>, + + #[clap(short, long)] + login: String, + }, } #[tokio::main] @@ -129,191 +158,219 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let args = Args::parse(); + let any_config = read_config(args.config_file)?; - match args.command { - Command::Server { config_file } => { - let config = read_config(config_file)?; - - let server = Server::new(config).await?; - server.run().await?; + match (&args.command, any_config) { + (Command::Companion(subcommand), AnyConfig::Companion(config)) => match subcommand { + CompanionCommand::Daemon => { + let server = Server::from_companion_config(config).await?; + server.run().await?; + } + CompanionCommand::Reload { pid } => reload(*pid, config.pid)?, + CompanionCommand::Wizard => { + unimplemented!(); + } + CompanionCommand::Account(cmd) => { + let user_file = config.users.user_list; + account_management(&args.command, cmd, user_file)?; + } + }, + (Command::Provider(subcommand), AnyConfig::Provider(config)) => match subcommand { + ProviderCommand::Daemon => { + let server = Server::from_provider_config(config).await?; + server.run().await?; + } + ProviderCommand::Reload { pid } => reload(*pid, config.pid)?, + ProviderCommand::Account(cmd) => { + let user_file = match config.users { + UserManagement::Static(conf) => conf.user_list, + UserManagement::Ldap(_) => { + panic!("LDAP account management is not supported from Aerogramme.") + } + }; + account_management(&args.command, cmd, user_file)?; + } + }, + (Command::Provider(_), AnyConfig::Companion(_)) => { + bail!("Your want to run a 'Provider' command but your configuration file has role 'Companion'."); } - Command::Test { config_file } => { - let config = read_config(config_file)?; - - let _server = Server::new(config).await?; - //server.test().await?; + (Command::Companion(_), AnyConfig::Provider(_)) => { + bail!("Your want to run a 'Companion' command but your configuration file has role 'Provider'."); } - Command::FirstLogin { - creds, - user_secrets, - } => { - let creds = make_storage_creds(creds); - let user_secrets = make_user_secrets(user_secrets); - - println!("Please enter your password for key decryption."); - println!("If you are using LDAP login, this must be your LDAP password."); - println!("If you are using the static login provider, enter any password, and this will also become your password for local IMAP access."); - let password = rpassword::prompt_password("Enter password: ")?; - let password_confirm = rpassword::prompt_password("Confirm password: ")?; - if password != password_confirm { - bail!("Passwords don't match."); + (Command::Tools(subcommand), _) => match subcommand { + ToolsCommand::PasswordHash { maybe_password } => { + let password = match maybe_password { + Some(pwd) => pwd.clone(), + None => rpassword::prompt_password("Enter password: ")?, + }; + println!("{}", hash_password(&password)?); } + ToolsCommand::CryptoRoot(crcommand) => match crcommand { + CryptoRootCommand::New { maybe_password } => { + let password = match maybe_password { + Some(pwd) => pwd.clone(), + None => { + let password = rpassword::prompt_password("Enter password: ")?; + let password_confirm = + rpassword::prompt_password("Confirm password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + password + } + }; + let crypto_keys = CryptoKeys::init(); + let cr = CryptoRoot::create_pass(&password, &crypto_keys)?; + println!("{}", cr.0); + } + CryptoRootCommand::NewClearText => { + let crypto_keys = CryptoKeys::init(); + let cr = CryptoRoot::create_cleartext(&crypto_keys); + println!("{}", cr.0); + } + CryptoRootCommand::ChangePassword { + maybe_old_password, + maybe_new_password, + crypto_root, + } => { + let old_password = match maybe_old_password { + Some(pwd) => pwd.to_string(), + None => rpassword::prompt_password("Enter old password: ")?, + }; + + let new_password = match maybe_new_password { + Some(pwd) => pwd.to_string(), + None => { + let password = rpassword::prompt_password("Enter new password: ")?; + let password_confirm = + rpassword::prompt_password("Confirm new password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + password + } + }; + + let keys = CryptoRoot(crypto_root.to_string()).crypto_keys(&old_password)?; + let cr = CryptoRoot::create_pass(&new_password, &keys)?; + println!("{}", cr.0); + } + CryptoRootCommand::DeriveIncoming { crypto_root } => { + let pubkey = CryptoRoot(crypto_root.to_string()).public_key()?; + let cr = CryptoRoot::create_incoming(&pubkey); + println!("{}", cr.0); + } + }, + }, + } - CryptoKeys::init(&creds, &user_secrets, &password).await?; + Ok(()) +} - println!(""); - println!("Cryptographic key setup is complete."); - println!(""); - println!("If you are using the static login provider, add the following section to your .toml configuration file:"); - println!(""); - dump_config(&password, &creds); +fn reload(pid: Option<i32>, pid_path: Option<PathBuf>) -> Result<()> { + let final_pid = match (pid, pid_path) { + (Some(pid), _) => pid, + (_, Some(path)) => { + let mut f = std::fs::OpenOptions::new().read(true).open(path)?; + let mut pidstr = String::new(); + f.read_to_string(&mut pidstr)?; + pidstr.parse::<i32>()? } - Command::InitializeLocalKeys { creds } => { - let creds = make_storage_creds(creds); - - println!("Please enter a password for local IMAP access."); - println!("This password is not used for key decryption, your keys will be printed below (do not lose them!)"); - println!( - "If you plan on using LDAP login, stop right here and use `first-login` instead" - ); - let password = rpassword::prompt_password("Enter password: ")?; - let password_confirm = rpassword::prompt_password("Confirm password: ")?; - if password != password_confirm { - bail!("Passwords don't match."); - } + _ => bail!("Unable to infer your daemon's PID"), + }; + let pid = Pid::from_raw(final_pid); + signal::kill(pid, signal::Signal::SIGUSR1)?; + Ok(()) +} - let master = gen_key(); - let (_, secret) = gen_keypair(); - let keys = CryptoKeys::init_without_password(&creds, &master, &secret).await?; - - println!(""); - println!("Cryptographic key setup is complete."); - println!(""); - println!("Add the following section to your .toml configuration file:"); - println!(""); - dump_config(&password, &creds); - dump_keys(&keys); - } - Command::AddPassword { - creds, - user_secrets, - gen, - } => { - let creds = make_storage_creds(creds); - let user_secrets = make_user_secrets(user_secrets); - - let existing_password = - rpassword::prompt_password("Enter existing password to decrypt keys: ")?; - let new_password = if gen { - let password = base64::encode_config( - &u128::to_be_bytes(thread_rng().gen())[..10], - base64::URL_SAFE_NO_PAD, - ); - println!("Your new password: {}", password); - println!("Keep it safe!"); - password - } else { - let password = rpassword::prompt_password("Enter new password: ")?; - let password_confirm = rpassword::prompt_password("Confirm new password: ")?; - if password != password_confirm { - bail!("Passwords don't match."); +fn account_management(root: &Command, cmd: &AccountManagement, users: PathBuf) -> Result<()> { + let mut ulist: UserList = + read_config(users.clone()).context(format!("'{:?}' must be a user database", users))?; + + match cmd { + AccountManagement::Add { login, setup } => { + tracing::debug!(user = login, "will-create"); + let stp: SetupEntry = read_config(setup.clone()) + .context(format!("'{:?}' must be a setup file", setup))?; + tracing::debug!(user = login, "loaded setup entry"); + + let password = match stp.clear_password { + Some(pwd) => pwd, + None => { + let password = rpassword::prompt_password("Enter password: ")?; + let password_confirm = rpassword::prompt_password("Confirm password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + password } - password }; - let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?; - keys.add_password(&creds, &user_secrets, &new_password) - .await?; - println!(""); - println!("New password added successfully."); - } - Command::DeletePassword { - creds, - user_secrets, - allow_delete_all, - } => { - let creds = make_storage_creds(creds); - let user_secrets = make_user_secrets(user_secrets); - - let existing_password = rpassword::prompt_password("Enter password to delete: ")?; - - let keys = match allow_delete_all { - true => Some(CryptoKeys::open(&creds, &user_secrets, &existing_password).await?), - false => None, + let crypto_keys = CryptoKeys::init(); + let crypto_root = match root { + Command::Provider(_) => CryptoRoot::create_pass(&password, &crypto_keys)?, + Command::Companion(_) => CryptoRoot::create_cleartext(&crypto_keys), + _ => unreachable!(), }; - CryptoKeys::delete_password(&creds, &existing_password, allow_delete_all).await?; + let hash = hash_password(password.as_str()).context("unable to hash password")?; - println!(""); - println!("Password was deleted successfully."); + ulist.insert( + login.clone(), + UserEntry { + email_addresses: stp.email_addresses, + password: hash, + crypto_root: crypto_root.0, + storage: stp.storage, + }, + ); - if let Some(keys) = keys { - println!("As a reminder, here are your cryptographic keys:"); - dump_keys(&keys); - } + write_config(users.clone(), &ulist)?; + } + AccountManagement::Delete { login } => { + tracing::debug!(user = login, "will-delete"); + ulist.remove(login); + write_config(users.clone(), &ulist)?; } - Command::ShowKeys { - creds, - user_secrets, + AccountManagement::ChangePassword { + maybe_old_password, + maybe_new_password, + login, } => { - let creds = make_storage_creds(creds); - let user_secrets = make_user_secrets(user_secrets); + let mut user = ulist.remove(login).context("user must exist first")?; - let existing_password = rpassword::prompt_password("Enter key decryption password: ")?; - - let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?; - dump_keys(&keys); - } - } + let old_password = match maybe_old_password { + Some(pwd) => pwd.to_string(), + None => rpassword::prompt_password("Enter old password: ")?, + }; - Ok(()) -} + if !verify_password(&old_password, &user.password)? { + bail!(format!("invalid password for login {}", login)); + } -fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials { - let s3_region = Region { - name: c.region.clone(), - endpoint: c.s3_endpoint, - }; - let k2v_region = Region { - name: c.region, - endpoint: c.k2v_endpoint, - }; - StorageCredentials { - k2v_region, - s3_region, - aws_access_key_id: c.aws_access_key_id, - aws_secret_access_key: c.aws_secret_access_key, - bucket: c.bucket, - } -} + let crypto_keys = CryptoRoot(user.crypto_root).crypto_keys(&old_password)?; + + let new_password = match maybe_new_password { + Some(pwd) => pwd.to_string(), + None => { + let password = rpassword::prompt_password("Enter new password: ")?; + let password_confirm = rpassword::prompt_password("Confirm new password: ")?; + if password != password_confirm { + bail!("Passwords don't match."); + } + password + } + }; + let new_hash = hash_password(&new_password)?; + let new_crypto_root = CryptoRoot::create_pass(&new_password, &crypto_keys)?; -fn make_user_secrets(c: UserSecretsArgs) -> UserSecrets { - UserSecrets { - user_secret: c.user_secret, - alternate_user_secrets: c - .alternate_user_secrets - .split(',') - .map(|x| x.trim()) - .filter(|x| !x.is_empty()) - .map(|x| x.to_string()) - .collect(), - } -} + user.password = new_hash; + user.crypto_root = new_crypto_root.0; -fn dump_config(password: &str, creds: &StorageCredentials) { - println!("[login_static.users.<username>]"); - println!( - "password = \"{}\"", - hash_password(password).expect("unable to hash password") - ); - println!("aws_access_key_id = \"{}\"", creds.aws_access_key_id); - println!( - "aws_secret_access_key = \"{}\"", - creds.aws_secret_access_key - ); -} + ulist.insert(login.clone(), user); + write_config(users.clone(), &ulist)?; + } + }; -fn dump_keys(keys: &CryptoKeys) { - println!("master_key = \"{}\"", base64::encode(&keys.master)); - println!("secret_key = \"{}\"", base64::encode(&keys.secret)); + Ok(()) } diff --git a/src/server.rs b/src/server.rs index f0eb35f..28e0b27 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,8 @@ +use std::io::Write; +use std::path::PathBuf; use std::sync::Arc; -use anyhow::{bail, Result}; +use anyhow::Result; use futures::try_join; use log::*; use tokio::sync::watch; @@ -9,31 +11,59 @@ use crate::config::*; use crate::imap; use crate::lmtp::*; use crate::login::ArcLoginProvider; -use crate::login::{ldap_provider::*, static_provider::*, Region}; +use crate::login::{ldap_provider::*, static_provider::*}; pub struct Server { lmtp_server: Option<Arc<LmtpServer>>, imap_server: Option<imap::Server>, + pid_file: Option<PathBuf>, } impl Server { - pub async fn new(config: Config) -> Result<Self> { - let (login, lmtp_conf, imap_conf) = build(config)?; + pub async fn from_companion_config(config: CompanionConfig) -> Result<Self> { + tracing::info!("Init as companion"); + let login = Arc::new(StaticLoginProvider::new(config.users).await?); - let lmtp_server = lmtp_conf.map(|cfg| LmtpServer::new(cfg, login.clone())); - let imap_server = match imap_conf { - Some(cfg) => Some(imap::new(cfg, login.clone()).await?), - None => None, + let lmtp_server = None; + let imap_server = Some(imap::new(config.imap, login.clone()).await?); + Ok(Self { + lmtp_server, + imap_server, + pid_file: config.pid, + }) + } + + pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> { + tracing::info!("Init as provider"); + let login: ArcLoginProvider = match config.users { + UserManagement::Static(x) => Arc::new(StaticLoginProvider::new(x).await?), + UserManagement::Ldap(x) => Arc::new(LdapLoginProvider::new(x)?), }; + let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone())); + let imap_server = Some(imap::new(config.imap, login.clone()).await?); + Ok(Self { lmtp_server, imap_server, + pid_file: config.pid, }) } pub async fn run(self) -> Result<()> { - tracing::info!("Starting Aerogramme..."); + let pid = std::process::id(); + tracing::info!(pid = pid, "Starting main loops"); + + // write the pid file + if let Some(pid_file) = self.pid_file { + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(pid_file)?; + file.write_all(pid.to_string().as_bytes())?; + drop(file); + } let (exit_signal, provoke_exit) = watch_ctrl_c(); let _exit_on_err = move |err: anyhow::Error| { @@ -60,28 +90,6 @@ impl Server { } } -fn build(config: Config) -> Result<(ArcLoginProvider, Option<LmtpConfig>, Option<ImapConfig>)> { - let s3_region = Region { - name: config.aws_region.clone(), - endpoint: config.s3_endpoint, - }; - let k2v_region = Region { - name: config.aws_region, - endpoint: config.k2v_endpoint, - }; - - let lp: ArcLoginProvider = match (config.login_static, config.login_ldap) { - (Some(st), None) => Arc::new(StaticLoginProvider::new(st, k2v_region, s3_region)?), - (None, Some(ld)) => Arc::new(LdapLoginProvider::new(ld, k2v_region, s3_region)?), - (Some(_), Some(_)) => { - bail!("A single login provider must be set up in config file") - } - (None, None) => bail!("No login provider is set up in config file"), - }; - - Ok((lp, config.lmtp, config.imap)) -} - pub fn watch_ctrl_c() -> (watch::Receiver<bool>, Arc<watch::Sender<bool>>) { let (send_cancel, watch_cancel) = watch::channel(false); let send_cancel = Arc::new(send_cancel); diff --git a/src/storage/garage.rs b/src/storage/garage.rs new file mode 100644 index 0000000..90b84d6 --- /dev/null +++ b/src/storage/garage.rs @@ -0,0 +1,489 @@ +use crate::storage::*; +use aws_sdk_s3::{self as s3, error::SdkError, operation::get_object::GetObjectError}; +use serde::Serialize; + +#[derive(Clone, Debug, Serialize)] +pub struct GarageConf { + pub region: String, + pub s3_endpoint: String, + pub k2v_endpoint: String, + pub aws_access_key_id: String, + pub aws_secret_access_key: String, + pub bucket: String, +} + +#[derive(Clone, Debug)] +pub struct GarageBuilder { + conf: GarageConf, + unicity: Vec<u8>, +} + +impl GarageBuilder { + pub fn new(conf: GarageConf) -> anyhow::Result<Arc<Self>> { + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.append(&mut rmp_serde::to_vec(&conf)?); + Ok(Arc::new(Self { conf, unicity })) + } +} + +#[async_trait] +impl IBuilder for GarageBuilder { + async fn build(&self) -> Result<Store, StorageError> { + let s3_creds = s3::config::Credentials::new( + self.conf.aws_access_key_id.clone(), + self.conf.aws_secret_access_key.clone(), + None, + None, + "aerogramme", + ); + + let sdk_config = aws_config::from_env() + .region(aws_config::Region::new(self.conf.region.clone())) + .credentials_provider(s3_creds) + .endpoint_url(self.conf.s3_endpoint.clone()) + .load() + .await; + + let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + + let k2v_config = k2v_client::K2vClientConfig { + endpoint: self.conf.k2v_endpoint.clone(), + region: self.conf.region.clone(), + aws_access_key_id: self.conf.aws_access_key_id.clone(), + aws_secret_access_key: self.conf.aws_secret_access_key.clone(), + bucket: self.conf.bucket.clone(), + user_agent: None, + }; + + let k2v_client = match k2v_client::K2vClient::new(k2v_config) { + Err(e) => { + tracing::error!("unable to build k2v client: {}", e); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + + Ok(Box::new(GarageStore { + bucket: self.conf.bucket.clone(), + s3: s3_client, + k2v: k2v_client, + })) + } + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } +} + +pub struct GarageStore { + bucket: String, + s3: s3::Client, + k2v: k2v_client::K2vClient, +} + +fn causal_to_row_val(row_ref: RowRef, causal_value: k2v_client::CausalValue) -> RowVal { + let new_row_ref = row_ref.with_causality(causal_value.causality.into()); + let row_values = causal_value + .value + .into_iter() + .map(|k2v_value| match k2v_value { + k2v_client::K2vValue::Tombstone => Alternative::Tombstone, + k2v_client::K2vValue::Value(v) => Alternative::Value(v), + }) + .collect::<Vec<_>>(); + + RowVal { + row_ref: new_row_ref, + value: row_values, + } +} + +#[async_trait] +impl IStore for GarageStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + let (pk_list, batch_op) = match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + start: Some(sort_begin), + end: Some(sort_end), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + ), + Selector::List(row_ref_list) => ( + row_ref_list + .iter() + .map(|row_ref| row_ref.uid.shard.to_string()) + .collect::<Vec<_>>(), + row_ref_list + .iter() + .map(|row_ref| k2v_client::BatchReadOp { + partition_key: &row_ref.uid.shard, + filter: k2v_client::Filter { + start: Some(&row_ref.uid.sort), + ..k2v_client::Filter::default() + }, + single_item: true, + ..k2v_client::BatchReadOp::default() + }) + .collect::<Vec<_>>(), + ), + Selector::Prefix { shard, sort_prefix } => ( + vec![shard.to_string()], + vec![k2v_client::BatchReadOp { + partition_key: shard, + filter: k2v_client::Filter { + prefix: Some(sort_prefix), + ..k2v_client::Filter::default() + }, + ..k2v_client::BatchReadOp::default() + }], + ), + Selector::Single(row_ref) => { + let causal_value = match self + .k2v + .read_item(&row_ref.uid.shard, &row_ref.uid.sort) + .await + { + Err(k2v_client::Error::NotFound) => { + tracing::debug!( + "K2V item not found shard={}, sort={}, bucket={}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + ); + return Err(StorageError::NotFound); + } + Err(e) => { + tracing::error!( + "K2V read item shard={}, sort={}, bucket={} failed: {}", + row_ref.uid.shard, + row_ref.uid.sort, + self.bucket, + e + ); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + + let row_val = causal_to_row_val((*row_ref).clone(), causal_value); + return Ok(vec![row_val]); + } + }; + + let all_raw_res = match self.k2v.read_batch(&batch_op).await { + Err(e) => { + tracing::error!( + "k2v read batch failed for {:?}, bucket {} with err: {}", + select, + self.bucket, + e + ); + return Err(StorageError::Internal); + } + Ok(v) => v, + }; + + let row_vals = all_raw_res + .into_iter() + .fold(vec![], |mut acc, v| { + acc.extend(v.items); + acc + }) + .into_iter() + .zip(pk_list.into_iter()) + .map(|((sk, cv), pk)| causal_to_row_val(RowRef::new(&pk, &sk), cv)) + .collect::<Vec<_>>(); + + Ok(row_vals) + } + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + let del_op = match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: None, + start: Some(sort_begin), + end: Some(sort_end), + single_item: false, + }], + Selector::List(row_ref_list) => { + // Insert null values with causality token = delete + let batch_op = row_ref_list + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.uid.shard, + sort_key: &v.uid.sort, + causality: v.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }) + .collect::<Vec<_>>(); + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + }; + } + Selector::Prefix { shard, sort_prefix } => vec![k2v_client::BatchDeleteOp { + partition_key: shard, + prefix: Some(sort_prefix), + start: None, + end: None, + single_item: false, + }], + Selector::Single(row_ref) => { + // Insert null values with causality token = delete + let batch_op = vec![k2v_client::BatchInsertOp { + partition_key: &row_ref.uid.shard, + sort_key: &row_ref.uid.sort, + causality: row_ref.causality.clone().map(|ct| ct.into()), + value: k2v_client::K2vValue::Tombstone, + }]; + + return match self.k2v.insert_batch(&batch_op).await { + Err(e) => { + tracing::error!("Unable to delete the list of values: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + }; + } + }; + + // Finally here we only have prefix & range + match self.k2v.delete_batch(&del_op).await { + Err(e) => { + tracing::error!("delete batch error: {}", e); + Err(StorageError::Internal) + } + Ok(_) => Ok(()), + } + } + + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + let batch_ops = values + .iter() + .map(|v| k2v_client::BatchInsertOp { + partition_key: &v.row_ref.uid.shard, + sort_key: &v.row_ref.uid.sort, + causality: v.row_ref.causality.clone().map(|ct| ct.into()), + value: v + .value + .iter() + .next() + .map(|cv| match cv { + Alternative::Value(buff) => k2v_client::K2vValue::Value(buff.clone()), + Alternative::Tombstone => k2v_client::K2vValue::Tombstone, + }) + .unwrap_or(k2v_client::K2vValue::Tombstone), + }) + .collect::<Vec<_>>(); + + match self.k2v.insert_batch(&batch_ops).await { + Err(e) => { + tracing::error!("k2v can't insert some value: {}", e); + Err(StorageError::Internal) + } + Ok(v) => Ok(v), + } + } + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + loop { + if let Some(ct) = &value.causality { + match self + .k2v + .poll_item(&value.uid.shard, &value.uid.sort, ct.clone().into(), None) + .await + { + Err(e) => { + tracing::error!("Unable to poll item: {}", e); + return Err(StorageError::Internal); + } + Ok(None) => continue, + Ok(Some(cv)) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } else { + match self.k2v.read_item(&value.uid.shard, &value.uid.sort).await { + Err(k2v_client::Error::NotFound) => { + self.k2v + .insert_item(&value.uid.shard, &value.uid.sort, vec![0u8], None) + .await + .map_err(|e| { + tracing::error!("Unable to insert item in polling logic: {}", e); + StorageError::Internal + })?; + } + Err(e) => { + tracing::error!("Unable to read item in polling logic: {}", e); + return Err(StorageError::Internal); + } + Ok(cv) => return Ok(causal_to_row_val(value.clone(), cv)), + } + } + } + } + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + let maybe_out = self + .s3 + .get_object() + .bucket(self.bucket.to_string()) + .key(blob_ref.0.to_string()) + .send() + .await; + + let object_output = match maybe_out { + Ok(output) => output, + Err(SdkError::ServiceError(x)) => match x.err() { + GetObjectError::NoSuchKey(_) => return Err(StorageError::NotFound), + e => { + tracing::warn!("Blob Fetch Error, Service Error: {}", e); + return Err(StorageError::Internal); + } + }, + Err(e) => { + tracing::warn!("Blob Fetch Error, {}", e); + return Err(StorageError::Internal); + } + }; + + let buffer = match object_output.body.collect().await { + Ok(aggreg) => aggreg.to_vec(), + Err(e) => { + tracing::warn!("Fetching body failed with {}", e); + return Err(StorageError::Internal); + } + }; + + let mut bv = BlobVal::new(blob_ref.clone(), buffer); + if let Some(meta) = object_output.metadata { + bv.meta = meta; + } + tracing::debug!("Fetched {}/{}", self.bucket, blob_ref.0); + Ok(bv) + } + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + let streamable_value = s3::primitives::ByteStream::from(blob_val.value); + + let maybe_send = self + .s3 + .put_object() + .bucket(self.bucket.to_string()) + .key(blob_val.blob_ref.0.to_string()) + .set_metadata(Some(blob_val.meta)) + .body(streamable_value) + .send() + .await; + + match maybe_send { + Err(e) => { + tracing::error!("unable to send object: {}", e); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("Inserted {}/{}", self.bucket, blob_val.blob_ref.0); + Ok(()) + } + } + } + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + let maybe_copy = self + .s3 + .copy_object() + .bucket(self.bucket.to_string()) + .key(dst.0.clone()) + .copy_source(format!("/{}/{}", self.bucket.to_string(), src.0.clone())) + .send() + .await; + + match maybe_copy { + Err(e) => { + tracing::error!( + "unable to copy object {} to {} (bucket: {}), error: {}", + src.0, + dst.0, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("copied {} to {} (bucket: {})", src.0, dst.0, self.bucket); + Ok(()) + } + } + } + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + let maybe_list = self + .s3 + .list_objects_v2() + .bucket(self.bucket.to_string()) + .prefix(prefix) + .into_paginator() + .send() + .try_collect() + .await; + + match maybe_list { + Err(e) => { + tracing::error!( + "listing prefix {} on bucket {} failed: {}", + prefix, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(pagin_list_out) => Ok(pagin_list_out + .into_iter() + .map(|list_out| list_out.contents.unwrap_or(vec![])) + .flatten() + .map(|obj| BlobRef(obj.key.unwrap_or(String::new()))) + .collect::<Vec<_>>()), + } + } + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + let maybe_delete = self + .s3 + .delete_object() + .bucket(self.bucket.to_string()) + .key(blob_ref.0.clone()) + .send() + .await; + + match maybe_delete { + Err(e) => { + tracing::error!( + "unable to delete {} (bucket: {}), error {}", + blob_ref.0, + self.bucket, + e + ); + Err(StorageError::Internal) + } + Ok(_) => { + tracing::debug!("deleted {} (bucket: {})", blob_ref.0, self.bucket); + Ok(()) + } + } + } +} diff --git a/src/storage/in_memory.rs b/src/storage/in_memory.rs new file mode 100644 index 0000000..3c3a94c --- /dev/null +++ b/src/storage/in_memory.rs @@ -0,0 +1,334 @@ +use crate::storage::*; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Bound::{self, Excluded, Included, Unbounded}; +use std::sync::{Arc, RwLock}; +use tokio::sync::Notify; + +/// This implementation is very inneficient, and not completely correct +/// Indeed, when the connector is dropped, the memory is freed. +/// It means that when a user disconnects, its data are lost. +/// It's intended only for basic debugging, do not use it for advanced tests... + +#[derive(Debug, Default)] +pub struct MemDb(tokio::sync::Mutex<HashMap<String, Arc<MemBuilder>>>); +impl MemDb { + pub fn new() -> Self { + Self(tokio::sync::Mutex::new(HashMap::new())) + } + + pub async fn builder(&self, username: &str) -> Arc<MemBuilder> { + let mut global_storage = self.0.lock().await; + global_storage + .entry(username.to_string()) + .or_insert(MemBuilder::new(username)) + .clone() + } +} + +#[derive(Debug, Clone)] +enum InternalData { + Tombstone, + Value(Vec<u8>), +} +impl InternalData { + fn to_alternative(&self) -> Alternative { + match self { + Self::Tombstone => Alternative::Tombstone, + Self::Value(x) => Alternative::Value(x.clone()), + } + } +} + +#[derive(Debug)] +struct InternalRowVal { + data: Vec<InternalData>, + version: u64, + change: Arc<Notify>, +} +impl std::default::Default for InternalRowVal { + fn default() -> Self { + Self { + data: vec![], + version: 1, + change: Arc::new(Notify::new()), + } + } +} +impl InternalRowVal { + fn concurrent_values(&self) -> Vec<Alternative> { + self.data.iter().map(InternalData::to_alternative).collect() + } + + fn to_row_val(&self, row_ref: RowRef) -> RowVal { + RowVal { + row_ref: row_ref.with_causality(self.version.to_string()), + value: self.concurrent_values(), + } + } +} + +#[derive(Debug, Default, Clone)] +struct InternalBlobVal { + data: Vec<u8>, + metadata: HashMap<String, String>, +} +impl InternalBlobVal { + fn to_blob_val(&self, bref: &BlobRef) -> BlobVal { + BlobVal { + blob_ref: bref.clone(), + meta: self.metadata.clone(), + value: self.data.clone(), + } + } +} + +type ArcRow = Arc<RwLock<HashMap<String, BTreeMap<String, InternalRowVal>>>>; +type ArcBlob = Arc<RwLock<BTreeMap<String, InternalBlobVal>>>; + +#[derive(Clone, Debug)] +pub struct MemBuilder { + unicity: Vec<u8>, + row: ArcRow, + blob: ArcBlob, +} + +impl MemBuilder { + pub fn new(user: &str) -> Arc<Self> { + tracing::debug!("initialize membuilder for {}", user); + let mut unicity: Vec<u8> = vec![]; + unicity.extend_from_slice(file!().as_bytes()); + unicity.extend_from_slice(user.as_bytes()); + Arc::new(Self { + unicity, + row: Arc::new(RwLock::new(HashMap::new())), + blob: Arc::new(RwLock::new(BTreeMap::new())), + }) + } +} + +#[async_trait] +impl IBuilder for MemBuilder { + async fn build(&self) -> Result<Store, StorageError> { + Ok(Box::new(MemStore { + row: self.row.clone(), + blob: self.blob.clone(), + })) + } + + fn unique(&self) -> UnicityBuffer { + UnicityBuffer(self.unicity.clone()) + } +} + +pub struct MemStore { + row: ArcRow, + blob: ArcBlob, +} + +fn prefix_last_bound(prefix: &str) -> Bound<String> { + let mut sort_end = prefix.to_string(); + match sort_end.pop() { + None => Unbounded, + Some(ch) => { + let nc = char::from_u32(ch as u32 + 1).unwrap(); + sort_end.push(nc); + Excluded(sort_end) + } + } +} + +impl MemStore { + fn row_rm_single(&self, entry: &RowRef) -> Result<(), StorageError> { + tracing::trace!(entry=%entry, command="row_rm_single"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let shard = &entry.uid.shard; + let sort = &entry.uid.sort; + + let cauz = match entry.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Tombstone); + intval.version += 1; + intval.change.notify_waiters(); + + Ok(()) + } +} + +#[async_trait] +impl IStore for MemStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError> { + tracing::trace!(select=%select, command="row_fetch"); + let store = self.row.read().or(Err(StorageError::Internal))?; + + match select { + Selector::Range { + shard, + sort_begin, + sort_end, + } => Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range(( + Included(sort_begin.to_string()), + Excluded(sort_end.to_string()), + )) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()), + Selector::List(rlist) => { + let mut acc = vec![]; + for row_ref in rlist { + let maybe_intval = store + .get(&row_ref.uid.shard) + .map(|v| v.get(&row_ref.uid.sort)) + .flatten(); + if let Some(intval) = maybe_intval { + acc.push(intval.to_row_val(row_ref.clone())); + } + } + Ok(acc) + } + Selector::Prefix { shard, sort_prefix } => { + let last_bound = prefix_last_bound(sort_prefix); + + Ok(store + .get(*shard) + .unwrap_or(&BTreeMap::new()) + .range((Included(sort_prefix.to_string()), last_bound)) + .map(|(k, v)| v.to_row_val(RowRef::new(shard, k))) + .collect::<Vec<_>>()) + } + Selector::Single(row_ref) => { + let intval = store + .get(&row_ref.uid.shard) + .ok_or(StorageError::NotFound)? + .get(&row_ref.uid.sort) + .ok_or(StorageError::NotFound)?; + Ok(vec![intval.to_row_val((*row_ref).clone())]) + } + } + } + + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError> { + tracing::trace!(select=%select, command="row_rm"); + + let values = match select { + Selector::Range { .. } | Selector::Prefix { .. } => self + .row_fetch(select) + .await? + .into_iter() + .map(|rv| rv.row_ref) + .collect::<Vec<_>>(), + Selector::List(rlist) => rlist.clone(), + Selector::Single(row_ref) => vec![(*row_ref).clone()], + }; + + for v in values.into_iter() { + self.row_rm_single(&v)?; + } + Ok(()) + } + + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError> { + tracing::trace!(entries=%values.iter().map(|v| v.row_ref.to_string()).collect::<Vec<_>>().join(","), command="row_insert"); + let mut store = self.row.write().or(Err(StorageError::Internal))?; + for v in values.into_iter() { + let shard = v.row_ref.uid.shard; + let sort = v.row_ref.uid.sort; + + let val = match v.value.into_iter().next() { + Some(Alternative::Value(x)) => x, + _ => vec![], + }; + + let cauz = match v.row_ref.causality.map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let bt = store.entry(shard).or_default(); + let intval = bt.entry(sort).or_default(); + + if cauz == intval.version { + intval.data.clear(); + } + intval.data.push(InternalData::Value(val)); + intval.version += 1; + intval.change.notify_waiters(); + } + Ok(()) + } + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError> { + tracing::trace!(entry=%value, command="row_poll"); + let shard = &value.uid.shard; + let sort = &value.uid.sort; + let cauz = match value.causality.as_ref().map(|v| v.parse::<u64>()) { + Some(Ok(v)) => v, + _ => 0, + }; + + let notify_me = { + let mut store = self.row.write().or(Err(StorageError::Internal))?; + let bt = store.entry(shard.to_string()).or_default(); + let intval = bt.entry(sort.to_string()).or_default(); + + if intval.version != cauz { + return Ok(intval.to_row_val(value.clone())); + } + intval.change.clone() + }; + + notify_me.notified().await; + + let res = self.row_fetch(&Selector::Single(value)).await?; + res.into_iter().next().ok_or(StorageError::NotFound) + } + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_fetch"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + store + .get(&blob_ref.0) + .ok_or(StorageError::NotFound) + .map(|v| v.to_blob_val(blob_ref)) + } + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_val.blob_ref, command="blob_insert"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let entry = store.entry(blob_val.blob_ref.0.clone()).or_default(); + entry.data = blob_val.value.clone(); + entry.metadata = blob_val.meta.clone(); + Ok(()) + } + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(src=%src, dst=%dst, command="blob_copy"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + let blob_src = store.entry(src.0.clone()).or_default().clone(); + store.insert(dst.0.clone(), blob_src); + Ok(()) + } + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError> { + tracing::trace!(prefix = prefix, command = "blob_list"); + let store = self.blob.read().or(Err(StorageError::Internal))?; + let last_bound = prefix_last_bound(prefix); + let blist = store + .range((Included(prefix.to_string()), last_bound)) + .map(|(k, _)| BlobRef(k.to_string())) + .collect::<Vec<_>>(); + Ok(blist) + } + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError> { + tracing::trace!(entry=%blob_ref, command="blob_rm"); + let mut store = self.blob.write().or(Err(StorageError::Internal))?; + store.remove(&blob_ref.0); + Ok(()) + } +} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..1f86f71 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,179 @@ +/* + * + * This abstraction goal is to leverage all the semantic of Garage K2V+S3, + * to be as tailored as possible to it ; it aims to be a zero-cost abstraction + * compared to when we where directly using the K2V+S3 client. + * + * My idea: we can encapsulate the causality token + * into the object system so it is not exposed. + */ + +pub mod garage; +pub mod in_memory; + +use async_trait::async_trait; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum Alternative { + Tombstone, + Value(Vec<u8>), +} +type ConcurrentValues = Vec<Alternative>; + +#[derive(Debug, Clone)] +pub enum StorageError { + NotFound, + Internal, +} +impl std::fmt::Display for StorageError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Storage Error: ")?; + match self { + Self::NotFound => f.write_str("Item not found"), + Self::Internal => f.write_str("An internal error occured"), + } + } +} +impl std::error::Error for StorageError {} + +#[derive(Debug, Clone, PartialEq)] +pub struct RowUid { + pub shard: String, + pub sort: String, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct RowRef { + pub uid: RowUid, + pub causality: Option<String>, +} +impl std::fmt::Display for RowRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RowRef({}, {}, {:?})", + self.uid.shard, self.uid.sort, self.causality + ) + } +} + +impl RowRef { + pub fn new(shard: &str, sort: &str) -> Self { + Self { + uid: RowUid { + shard: shard.to_string(), + sort: sort.to_string(), + }, + causality: None, + } + } + pub fn with_causality(mut self, causality: String) -> Self { + self.causality = Some(causality); + self + } +} + +#[derive(Debug, Clone)] +pub struct RowVal { + pub row_ref: RowRef, + pub value: ConcurrentValues, +} + +impl RowVal { + pub fn new(row_ref: RowRef, value: Vec<u8>) -> Self { + Self { + row_ref, + value: vec![Alternative::Value(value)], + } + } +} + +#[derive(Debug, Clone)] +pub struct BlobRef(pub String); +impl std::fmt::Display for BlobRef { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BlobRef({})", self.0) + } +} + +#[derive(Debug, Clone)] +pub struct BlobVal { + pub blob_ref: BlobRef, + pub meta: HashMap<String, String>, + pub value: Vec<u8>, +} +impl BlobVal { + pub fn new(blob_ref: BlobRef, value: Vec<u8>) -> Self { + Self { + blob_ref, + value, + meta: HashMap::new(), + } + } + + pub fn with_meta(mut self, k: String, v: String) -> Self { + self.meta.insert(k, v); + self + } +} + +#[derive(Debug)] +pub enum Selector<'a> { + Range { + shard: &'a str, + sort_begin: &'a str, + sort_end: &'a str, + }, + List(Vec<RowRef>), // list of (shard_key, sort_key) + #[allow(dead_code)] + Prefix { + shard: &'a str, + sort_prefix: &'a str, + }, + Single(&'a RowRef), +} +impl<'a> std::fmt::Display for Selector<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Range { + shard, + sort_begin, + sort_end, + } => write!(f, "Range({}, [{}, {}[)", shard, sort_begin, sort_end), + Self::List(list) => write!(f, "List({:?})", list), + Self::Prefix { shard, sort_prefix } => write!(f, "Prefix({}, {})", shard, sort_prefix), + Self::Single(row_ref) => write!(f, "Single({})", row_ref), + } + } +} + +#[async_trait] +pub trait IStore { + async fn row_fetch<'a>(&self, select: &Selector<'a>) -> Result<Vec<RowVal>, StorageError>; + async fn row_rm<'a>(&self, select: &Selector<'a>) -> Result<(), StorageError>; + async fn row_insert(&self, values: Vec<RowVal>) -> Result<(), StorageError>; + async fn row_poll(&self, value: &RowRef) -> Result<RowVal, StorageError>; + + async fn blob_fetch(&self, blob_ref: &BlobRef) -> Result<BlobVal, StorageError>; + async fn blob_insert(&self, blob_val: BlobVal) -> Result<(), StorageError>; + async fn blob_copy(&self, src: &BlobRef, dst: &BlobRef) -> Result<(), StorageError>; + async fn blob_list(&self, prefix: &str) -> Result<Vec<BlobRef>, StorageError>; + async fn blob_rm(&self, blob_ref: &BlobRef) -> Result<(), StorageError>; +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct UnicityBuffer(Vec<u8>); + +#[async_trait] +pub trait IBuilder: std::fmt::Debug { + async fn build(&self) -> Result<Store, StorageError>; + + /// Returns an opaque buffer that uniquely identifies this builder + fn unique(&self) -> UnicityBuffer; +} + +pub type Builder = Arc<dyn IBuilder + Send + Sync>; +pub type Store = Box<dyn IStore + Send + Sync>; diff --git a/src/time.rs b/src/time.rs deleted file mode 100644 index d34ee22..0000000 --- a/src/time.rs +++ /dev/null @@ -1,9 +0,0 @@ -use std::time::{SystemTime, UNIX_EPOCH}; - -/// Returns milliseconds since UNIX Epoch -pub fn now_msec() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Fix your clock :o") - .as_millis() as u64 -} diff --git a/src/timestamp.rs b/src/timestamp.rs new file mode 100644 index 0000000..76cb74b --- /dev/null +++ b/src/timestamp.rs @@ -0,0 +1,65 @@ +use rand::prelude::*; +use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Returns milliseconds since UNIX Epoch +pub fn now_msec() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Fix your clock :o") + .as_millis() as u64 +} + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct Timestamp { + pub msec: u64, + pub rand: u64, +} + +impl Timestamp { + #[allow(dead_code)] + // 2023-05-15 try to make clippy happy and not sure if this fn will be used in the future. + pub fn now() -> Self { + let mut rng = thread_rng(); + Self { + msec: now_msec(), + rand: rng.gen::<u64>(), + } + } + + pub fn after(other: &Self) -> Self { + let mut rng = thread_rng(); + Self { + msec: std::cmp::max(now_msec(), other.msec + 1), + rand: rng.gen::<u64>(), + } + } + + pub fn zero() -> Self { + Self { msec: 0, rand: 0 } + } +} + +impl ToString for Timestamp { + fn to_string(&self) -> String { + let mut bytes = [0u8; 16]; + bytes[0..8].copy_from_slice(&u64::to_be_bytes(self.msec)); + bytes[8..16].copy_from_slice(&u64::to_be_bytes(self.rand)); + hex::encode(bytes) + } +} + +impl FromStr for Timestamp { + type Err = &'static str; + + fn from_str(s: &str) -> Result<Timestamp, &'static str> { + let bytes = hex::decode(s).map_err(|_| "invalid hex")?; + if bytes.len() != 16 { + return Err("bad length"); + } + Ok(Self { + msec: u64::from_be_bytes(bytes[0..8].try_into().unwrap()), + rand: u64::from_be_bytes(bytes[8..16].try_into().unwrap()), + }) + } +} |