aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock193
-rw-r--r--Cargo.nix254
-rw-r--r--Makefile22
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs326
-rw-r--r--src/block/repair.rs468
-rw-r--r--src/db/Cargo.toml2
-rw-r--r--src/db/sqlite_adapter.rs2
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin.rs29
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/structs.rs55
-rw-r--r--src/garage/cli/util.rs58
-rw-r--r--src/garage/repair/online.rs228
-rw-r--r--src/model/index_counter.rs169
-rw-r--r--src/rpc/system.rs6
-rw-r--r--src/table/gc.rs92
-rw-r--r--src/table/merkle.rs90
-rw-r--r--src/table/sync.rs198
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background.rs160
-rw-r--r--src/util/background/job_worker.rs51
-rw-r--r--src/util/background/mod.rs117
-rw-r--r--src/util/background/worker.rs261
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/tranquilizer.rs25
27 files changed, 2084 insertions, 736 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ecdf8a57..c45ee015 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -95,6 +95,15 @@ dependencies = [
[[package]]
name = "autocfg"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"
+dependencies = [
+ "autocfg 1.1.0",
+]
+
+[[package]]
+name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
@@ -543,7 +552,7 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"cfg-if 1.0.0",
"crossbeam-utils 0.8.8",
"lazy_static",
@@ -959,6 +968,7 @@ dependencies = [
"futures",
"futures-util",
"garage_api",
+ "garage_block",
"garage_db",
"garage_model 0.7.0",
"garage_rpc 0.7.0",
@@ -984,6 +994,7 @@ dependencies = [
"sha2",
"static_init",
"structopt",
+ "timeago",
"tokio",
"toml",
"tracing",
@@ -1038,6 +1049,7 @@ dependencies = [
name = "garage_block"
version = "0.7.0"
dependencies = [
+ "arc-swap",
"async-trait",
"bytes 1.1.0",
"futures",
@@ -1065,11 +1077,11 @@ dependencies = [
"err-derive 0.3.1",
"heed",
"hexdump",
- "log",
"mktemp",
"pretty_env_logger",
"rusqlite",
"sled",
+ "tracing",
]
[[package]]
@@ -1258,6 +1270,7 @@ dependencies = [
name = "garage_util"
version = "0.7.0"
dependencies = [
+ "async-trait",
"blake2",
"chrono",
"err-derive 0.3.1",
@@ -1629,7 +1642,7 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"hashbrown",
]
@@ -1652,6 +1665,16 @@ dependencies = [
]
[[package]]
+name = "isolang"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"
+dependencies = [
+ "phf",
+ "phf_codegen",
+]
+
+[[package]]
name = "itertools"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1975,7 +1998,7 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
]
[[package]]
@@ -2137,7 +2160,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"num-traits",
]
@@ -2147,7 +2170,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
]
[[package]]
@@ -2216,7 +2239,7 @@ version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"cc",
"libc",
"openssl-src",
@@ -2387,6 +2410,44 @@ dependencies = [
]
[[package]]
+name = "phf"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
+dependencies = [
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_codegen"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
+dependencies = [
+ "phf_generator",
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_generator"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"
+dependencies = [
+ "phf_shared",
+ "rand 0.6.5",
+]
+
+[[package]]
+name = "phf_shared"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"
+dependencies = [
+ "siphasher",
+]
+
+[[package]]
name = "pin-project"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2642,17 +2703,46 @@ dependencies = [
[[package]]
name = "rand"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
+dependencies = [
+ "autocfg 0.1.8",
+ "libc",
+ "rand_chacha 0.1.1",
+ "rand_core 0.4.2",
+ "rand_hc",
+ "rand_isaac",
+ "rand_jitter",
+ "rand_os",
+ "rand_pcg",
+ "rand_xorshift",
+ "winapi",
+]
+
+[[package]]
+name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
- "rand_chacha",
+ "rand_chacha 0.3.1",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
+dependencies = [
+ "autocfg 0.1.8",
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
@@ -2686,6 +2776,77 @@ dependencies = [
]
[[package]]
+name = "rand_hc"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_isaac"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_jitter"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
+dependencies = [
+ "libc",
+ "rand_core 0.4.2",
+ "winapi",
+]
+
+[[package]]
+name = "rand_os"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
+dependencies = [
+ "cloudabi",
+ "fuchsia-cprng",
+ "libc",
+ "rand_core 0.4.2",
+ "rdrand",
+ "winapi",
+]
+
+[[package]]
+name = "rand_pcg"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"
+dependencies = [
+ "autocfg 0.1.8",
+ "rand_core 0.4.2",
+]
+
+[[package]]
+name = "rand_xorshift"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rdrand"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3108,6 +3269,12 @@ dependencies = [
]
[[package]]
+name = "siphasher"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
+
+[[package]]
name = "slab"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3356,6 +3523,16 @@ dependencies = [
]
[[package]]
+name = "timeago"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"
+dependencies = [
+ "chrono",
+ "isolang",
+]
+
+[[package]]
name = "tinyvec"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.nix b/Cargo.nix
index e5155e61..37bf3186 100644
--- a/Cargo.nix
+++ b/Cargo.nix
@@ -172,6 +172,16 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" = overridableMkRustCrate (profileName: rec {
+ name = "autocfg";
+ version = "0.1.8";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"; };
+ dependencies = {
+ autocfg = rustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { inherit profileName; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" = overridableMkRustCrate (profileName: rec {
name = "autocfg";
version = "1.1.0";
@@ -1364,6 +1374,7 @@ in
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; };
garage_api = rustPackages."unknown".garage_api."0.7.0" { inherit profileName; };
+ garage_block = rustPackages."unknown".garage_block."0.7.0" { inherit profileName; };
garage_db = rustPackages."unknown".garage_db."0.8.0" { inherit profileName; };
garage_model = rustPackages."unknown".garage_model."0.7.0" { inherit profileName; };
garage_rpc = rustPackages."unknown".garage_rpc."0.7.0" { inherit profileName; };
@@ -1383,6 +1394,7 @@ in
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; };
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
structopt = rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; };
+ timeago = rustPackages."registry+https://github.com/rust-lang/crates.io-index".timeago."0.3.1" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; };
toml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".toml."0.5.8" { inherit profileName; };
tracing = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; };
@@ -1458,6 +1470,7 @@ in
registry = "unknown";
src = fetchCrateLocal (workspaceSrc + "/src/block");
dependencies = {
+ arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; };
async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; };
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; };
@@ -1493,10 +1506,10 @@ in
err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; };
heed = rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; };
hexdump = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; };
- log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
${ if rootFeatures' ? "garage_db" then "pretty_env_logger" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pretty_env_logger."0.4.0" { inherit profileName; };
rusqlite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.27.0" { inherit profileName; };
sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; };
+ tracing = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; };
};
devDependencies = {
mktemp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.4.1" { inherit profileName; };
@@ -1717,6 +1730,7 @@ in
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_util") "k2v")
];
dependencies = {
+ ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "blake2" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.9.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "chrono" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; };
@@ -2219,6 +2233,22 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".isolang."1.0.0" = overridableMkRustCrate (profileName: rec {
+ name = "isolang";
+ version = "1.0.0";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"; };
+ features = builtins.concatLists [
+ [ "default" ]
+ ];
+ dependencies = {
+ phf = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf."0.7.24" { inherit profileName; };
+ };
+ buildDependencies = {
+ phf_codegen = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_codegen."0.7.24" { profileName = "__noProfile"; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".itertools."0.4.19" = overridableMkRustCrate (profileName: rec {
name = "itertools";
version = "0.4.19";
@@ -3242,6 +3272,48 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".phf."0.7.24" = overridableMkRustCrate (profileName: rec {
+ name = "phf";
+ version = "0.7.24";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"; };
+ dependencies = {
+ phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".phf_codegen."0.7.24" = overridableMkRustCrate (profileName: rec {
+ name = "phf_codegen";
+ version = "0.7.24";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"; };
+ dependencies = {
+ phf_generator = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_generator."0.7.24" { inherit profileName; };
+ phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".phf_generator."0.7.24" = overridableMkRustCrate (profileName: rec {
+ name = "phf_generator";
+ version = "0.7.24";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"; };
+ dependencies = {
+ phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
+ rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.6.5" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" = overridableMkRustCrate (profileName: rec {
+ name = "phf_shared";
+ version = "0.7.24";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"; };
+ dependencies = {
+ siphasher = rustPackages."registry+https://github.com/rust-lang/crates.io-index".siphasher."0.2.3" { inherit profileName; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".pin-project."0.4.29" = overridableMkRustCrate (profileName: rec {
name = "pin-project";
version = "0.4.29";
@@ -3568,6 +3640,34 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".rand."0.6.5" = overridableMkRustCrate (profileName: rec {
+ name = "rand";
+ version = "0.6.5";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"; };
+ features = builtins.concatLists [
+ [ "alloc" ]
+ [ "default" ]
+ [ "rand_os" ]
+ [ "std" ]
+ ];
+ dependencies = {
+ ${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ rand_chacha = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.1.1" { inherit profileName; };
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
+ rand_hc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_hc."0.1.0" { inherit profileName; };
+ rand_isaac = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_isaac."0.1.1" { inherit profileName; };
+ rand_jitter = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_jitter."0.1.4" { inherit profileName; };
+ rand_os = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_os."0.1.3" { inherit profileName; };
+ rand_pcg = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_pcg."0.1.2" { inherit profileName; };
+ rand_xorshift = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_xorshift."0.1.1" { inherit profileName; };
+ ${ if hostPlatform.isWindows then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
+ };
+ buildDependencies = {
+ autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" = overridableMkRustCrate (profileName: rec {
name = "rand";
version = "0.8.5";
@@ -3590,6 +3690,19 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.1.1" = overridableMkRustCrate (profileName: rec {
+ name = "rand_chacha";
+ version = "0.1.1";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"; };
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
+ };
+ buildDependencies = {
+ autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.3.1" = overridableMkRustCrate (profileName: rec {
name = "rand_chacha";
version = "0.3.1";
@@ -3644,6 +3757,93 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".rand_hc."0.1.0" = overridableMkRustCrate (profileName: rec {
+ name = "rand_hc";
+ version = "0.1.0";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"; };
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rand_isaac."0.1.1" = overridableMkRustCrate (profileName: rec {
+ name = "rand_isaac";
+ version = "0.1.1";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"; };
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rand_jitter."0.1.4" = overridableMkRustCrate (profileName: rec {
+ name = "rand_jitter";
+ version = "0.1.4";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"; };
+ features = builtins.concatLists [
+ [ "std" ]
+ ];
+ dependencies = {
+ ${ if hostPlatform.parsed.kernel.name == "darwin" || hostPlatform.parsed.kernel.name == "ios" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "windows" then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rand_os."0.1.3" = overridableMkRustCrate (profileName: rec {
+ name = "rand_os";
+ version = "0.1.3";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"; };
+ dependencies = {
+ ${ if hostPlatform.parsed.kernel.name == "cloudabi" then "cloudabi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cloudabi."0.0.3" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "fuchsia" then "fuchsia_cprng" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fuchsia-cprng."0.1.1" { inherit profileName; };
+ ${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
+ ${ if hostPlatform.parsed.abi.name == "sgx" then "rdrand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rdrand."0.4.0" { inherit profileName; };
+ ${ if hostPlatform.isWindows then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rand_pcg."0.1.2" = overridableMkRustCrate (profileName: rec {
+ name = "rand_pcg";
+ version = "0.1.2";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"; };
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
+ };
+ buildDependencies = {
+ autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rand_xorshift."0.1.1" = overridableMkRustCrate (profileName: rec {
+ name = "rand_xorshift";
+ version = "0.1.1";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"; };
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
+ };
+ });
+
+ "registry+https://github.com/rust-lang/crates.io-index".rdrand."0.4.0" = overridableMkRustCrate (profileName: rec {
+ name = "rdrand";
+ version = "0.4.0";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"; };
+ features = builtins.concatLists [
+ [ "default" ]
+ [ "std" ]
+ ];
+ dependencies = {
+ rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".redox_syscall."0.2.11" = overridableMkRustCrate (profileName: rec {
name = "redox_syscall";
version = "0.2.11";
@@ -3738,7 +3938,7 @@ in
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
- ${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
@@ -4213,6 +4413,13 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".siphasher."0.2.3" = overridableMkRustCrate (profileName: rec {
+ name = "siphasher";
+ version = "0.2.3";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"; };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".slab."0.4.5" = overridableMkRustCrate (profileName: rec {
name = "slab";
version = "0.4.5";
@@ -4327,7 +4534,7 @@ in
];
dependencies = {
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
- ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
@@ -4410,7 +4617,7 @@ in
[ "proc-macro" ]
[ "quote" ]
[ "visit" ]
- (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "visit-mut")
+ [ "visit-mut" ]
];
dependencies = {
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.36" { inherit profileName; };
@@ -4539,6 +4746,23 @@ in
};
});
+ "registry+https://github.com/rust-lang/crates.io-index".timeago."0.3.1" = overridableMkRustCrate (profileName: rec {
+ name = "timeago";
+ version = "0.3.1";
+ registry = "registry+https://github.com/rust-lang/crates.io-index";
+ src = fetchCratesIo { inherit name version; sha256 = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"; };
+ features = builtins.concatLists [
+ [ "chrono" ]
+ [ "default" ]
+ [ "isolang" ]
+ [ "translations" ]
+ ];
+ dependencies = {
+ chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
+ isolang = rustPackages."registry+https://github.com/rust-lang/crates.io-index".isolang."1.0.0" { inherit profileName; };
+ };
+ });
+
"registry+https://github.com/rust-lang/crates.io-index".tinyvec."1.5.1" = overridableMkRustCrate (profileName: rec {
name = "tinyvec";
version = "1.5.1";
@@ -4883,19 +5107,19 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"; };
features = builtins.concatLists [
- (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "attributes")
- (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default")
+ [ "attributes" ]
+ [ "default" ]
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web") "log")
(lib.optional (rootFeatures' ? "garage") "log-always")
- (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "std")
- (lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "tracing-attributes")
+ [ "std" ]
+ [ "tracing-attributes" ]
];
dependencies = {
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "cfg_if" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
+ cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "pin_project_lite" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tracing_attributes" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-attributes."0.1.20" { profileName = "__noProfile"; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tracing_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-core."0.1.23" { inherit profileName; };
+ pin_project_lite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; };
+ tracing_attributes = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-attributes."0.1.20" { profileName = "__noProfile"; };
+ tracing_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-core."0.1.23" { inherit profileName; };
};
});
@@ -5330,10 +5554,10 @@ in
[ "default" ]
];
dependencies = {
- ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
- ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
- ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
};
});
diff --git a/Makefile b/Makefile
index eeeffedb..1f0f3644 100644
--- a/Makefile
+++ b/Makefile
@@ -1,13 +1,27 @@
-.PHONY: doc all release shell
+.PHONY: doc all release shell run1 run2 run3
all:
clear; cargo build --all-features
-doc:
- cd doc/book; mdbook build
-
release:
nix-build --arg release true
shell:
nix-shell
+
+# ----
+
+run1:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config1.toml server
+run1rel:
+ RUST_LOG=garage=debug ./target/release/garage -c tmp/config1.toml server
+
+run2:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config2.toml server
+run2rel:
+ RUST_LOG=garage=debug ./target/release/garage -c tmp/config2.toml server
+
+run3:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config3.toml server
+run3rel:
+ RUST_LOG=garage=debug ./target/release/garage -c tmp/config3.toml server
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 80346aca..2555a44a 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
+arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
diff --git a/src/block/lib.rs b/src/block/lib.rs
index dc685657..ebdb95d8 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
+pub mod repair;
mod block;
mod metrics;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 32ba0431..36166ae3 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,18 +1,17 @@
-use core::ops::Bound;
-
use std::convert::TryInto;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
-use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::sync::{watch, Mutex, Notify};
+use tokio::select;
+use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -22,6 +21,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -36,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
+use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -93,16 +94,18 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: BlockRc,
+ pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: CountedTree,
- system: Arc<System>,
+ pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
+
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@@ -110,6 +113,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
+enum ResyncIterResult {
+ BusyDidSomething,
+ BusyDidNothing,
+ IdleFor(Duration),
+}
+
impl BlockManager {
pub fn new(
db: &db::Db,
@@ -157,10 +166,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
- block_manager.clone().spawn_background_worker();
+ block_manager.clone().spawn_background_workers();
block_manager
}
@@ -218,90 +228,6 @@ impl BlockManager {
Ok(())
}
- /// Launch the repair procedure on the data store
- ///
- /// This will list all blocks locally present, as well as those
- /// that are required because of refcount > 0, and will try
- /// to fix any mismatch between the two.
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table.
- let mut next_start: Option<Hash> = None;
- loop {
- // We have to do this complicated two-step process where we first read a bunch
- // of hashes from the RC table, and then insert them in the to-resync queue,
- // because of SQLite. Basically, as long as we have an iterator on a DB table,
- // we can't do anything else on the DB. The naive approach (which we had previously)
- // of just iterating on the RC table and inserting items one to one in the resync
- // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
- // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
- // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
- // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
- let mut batch_of_hashes = vec![];
- let start_bound = match next_start.as_ref() {
- None => Bound::Unbounded,
- Some(x) => Bound::Excluded(x.as_slice()),
- };
- for entry in self
- .rc
- .rc
- .range::<&[u8], _>((start_bound, Bound::Unbounded))?
- {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- batch_of_hashes.push(hash);
- if batch_of_hashes.len() >= 1000 {
- break;
- }
- }
- if batch_of_hashes.is_empty() {
- break;
- }
-
- for hash in batch_of_hashes.into_iter() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- next_start = Some(hash)
- }
-
- if *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
- self.for_each_file(
- (),
- move |_, hash| async move {
- self.put_to_resync(&hash, Duration::from_secs(0))
- .map_err(Into::into)
- },
- must_exit,
- )
- .await
- }
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn scrub_data_store(
- &self,
- must_exit: &watch::Receiver<bool>,
- tranquility: u32,
- ) -> Result<(), Error> {
- let tranquilizer = Tranquilizer::new(30);
- self.for_each_file(
- tranquilizer,
- move |mut tranquilizer, hash| async move {
- let _ = self.read_block(&hash).await;
- tranquilizer.tranquilize(tranquility).await;
- Ok(tranquilizer)
- },
- must_exit,
- )
- .await
- }
-
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@@ -321,6 +247,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Send command to start/stop/manager scrub worker
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
+ let _ = self
+ .tx_scrub_command
+ .load()
+ .as_ref()
+ .unwrap()
+ .send(cmd)
+ .await;
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@@ -390,7 +327,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -554,18 +491,27 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
- fn spawn_background_worker(self: Arc<Self>) {
+ fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
+ let worker = ResyncWorker {
+ manager: self.clone(),
+ tranquilizer: Tranquilizer::new(30),
+ next_delay: Duration::from_secs(10),
+ };
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
- background.spawn_worker("block resync worker".into(), move |must_exit| {
- self.resync_loop(must_exit)
- });
+ background.spawn_worker(worker);
});
+
+ // Launch a background worker for data store scrubs
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
+ self.system.background.spawn_worker(scrub_worker);
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
+ pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@@ -579,37 +525,7 @@ impl BlockManager {
Ok(())
}
- async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- let mut tranquilizer = Tranquilizer::new(30);
-
- while !*must_exit.borrow() {
- match self.resync_iter(&mut must_exit).await {
- Ok(true) => {
- tranquilizer.tranquilize(self.background_tranquility).await;
- }
- Ok(false) => {
- tranquilizer.reset();
- }
- Err(e) => {
- // The errors that we have here are only Sled errors
- // We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
- // if it does there is not much we can do -- TODO should we just panic?)
- error!(
- "Could not do a resync iteration: {} (this is a very bad error)",
- e
- );
- tranquilizer.reset();
- }
- }
- }
- }
-
- // The result of resync_iter is:
- // - Ok(true) -> a block was processed (successfully or not)
- // - Ok(false) -> no block was processed, but we are ready for the next iteration
- // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -629,7 +545,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
- return Ok(false);
+ return Ok(ResyncIterResult::BusyDidNothing);
}
}
@@ -676,15 +592,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
- Ok(true)
+ Ok(ResyncIterResult::BusyDidSomething)
} else {
- let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_millis(
+ time_msec - now,
+ )))
}
} else {
// Here we wait either for a notification that an item has been
@@ -693,13 +605,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
- let delay = tokio::time::sleep(Duration::from_secs(10));
- select! {
- _ = delay.fuse() => {},
- _ = self.resync_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- Ok(false)
+ Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
}
}
@@ -814,72 +720,6 @@ impl BlockManager {
Ok(())
}
-
- // ---- Utility: iteration on files in the data directory ----
-
- async fn for_each_file<F, Fut, State>(
- &self,
- state: State,
- mut f: F,
- must_exit: &watch::Receiver<bool>,
- ) -> Result<(), Error>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send,
- {
- self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
- .await
- .map(|_| ())
- }
-
- fn for_each_file_rec<'a, F, Fut, State>(
- &'a self,
- path: &'a Path,
- mut state: State,
- f: &'a mut F,
- must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<State, Error>>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send + 'a,
- {
- async move {
- let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
- if *must_exit.borrow() {
- break;
- }
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- state = self
- .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
- .await?;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- state = f(state, hash.into()).await?;
- }
- }
- Ok(state)
- }
- .boxed()
- }
}
#[async_trait]
@@ -898,6 +738,70 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct ResyncWorker {
+ manager: Arc<BlockManager>,
+ tranquilizer: Tranquilizer,
+ next_delay: Duration,
+}
+
+#[async_trait]
+impl Worker for ResyncWorker {
+ fn name(&self) -> String {
+ "Block resync worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let mut ret = vec![];
+ let qlen = self.manager.resync_queue_len().unwrap_or(0);
+ let elen = self.manager.resync_errors_len().unwrap_or(0);
+ if qlen > 0 {
+ ret.push(format!("{} blocks in queue", qlen));
+ }
+ if elen > 0 {
+ ret.push(format!("{} blocks in error state", elen));
+ }
+ if !ret.is_empty() {
+ Some(ret.join(", "))
+ } else {
+ None
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.tranquilizer.reset();
+ match self.manager.resync_iter().await {
+ Ok(ResyncIterResult::BusyDidSomething) => Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.manager.background_tranquility)),
+ Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy),
+ Ok(ResyncIterResult::IdleFor(delay)) => {
+ self.next_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ Err(e) => {
+ // The errors that we have here are only Sled errors
+ // We don't really know how to handle them so just ¯\_(ツ)_/¯
+ // (there is kind of an assumption that Sled won't error on us,
+ // if it does there is not much we can do -- TODO should we just panic?)
+ // Here we just give the error to the worker manager,
+ // it will print it to the logs and increment a counter
+ Err(e.into())
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ select! {
+ _ = tokio::time::sleep(self.next_delay) => (),
+ _ = self.manager.resync_notify.notified() => (),
+ };
+ WorkerStatus::Busy
+ }
+}
+
struct BlockStatus {
exists: bool,
needed: RcEntry,
diff --git a/src/block/repair.rs b/src/block/repair.rs
new file mode 100644
index 00000000..284a8846
--- /dev/null
+++ b/src/block/repair.rs
@@ -0,0 +1,468 @@
+use core::ops::Bound;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use tokio::fs;
+use tokio::select;
+use tokio::sync::mpsc;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::persister::Persister;
+use garage_util::time::*;
+use garage_util::tranquilizer::Tranquilizer;
+
+use crate::manager::*;
+
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
+
+pub struct RepairWorker {
+ manager: Arc<BlockManager>,
+ next_start: Option<Hash>,
+ block_iter: Option<BlockStoreIterator>,
+}
+
+impl RepairWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ next_start: None,
+ block_iter: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairWorker {
+ fn name(&self) -> String {
+ "Block repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ match self.block_iter.as_ref() {
+ None => {
+ let idx_bytes = self
+ .next_start
+ .as_ref()
+ .map(|x| x.as_slice())
+ .unwrap_or(&[]);
+ let idx_bytes = if idx_bytes.len() > 4 {
+ &idx_bytes[..4]
+ } else {
+ idx_bytes
+ };
+ Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ }
+ Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.block_iter.as_mut() {
+ None => {
+ // Phase 1: Repair blocks from RC table.
+
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ // TODO: maybe do this with tokio::task::spawn_blocking ?
+ let mut batch_of_hashes = vec![];
+ let start_bound = match self.next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .manager
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ // move on to phase 2
+ self.block_iter = Some(BlockStoreIterator::new(&self.manager));
+ return Ok(WorkerStatus::Busy);
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ self.next_start = Some(hash)
+ }
+
+ Ok(WorkerStatus::Busy)
+ }
+ Some(bi) => {
+ // Phase 2: Repair blocks actually on disk
+ // Lists all blocks on disk and adds them to the resync queue.
+ // This allows us to find blocks we are storing but don't actually need,
+ // so that we can offload them if necessary and then delete them locally.
+ if let Some(hash) = bi.next().await? {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Done)
+ }
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
+ }
+}
+
+// ----
+
+pub struct ScrubWorker {
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+
+ work: ScrubWorkerState,
+ tranquilizer: Tranquilizer,
+
+ persister: Persister<ScrubWorkerPersisted>,
+ persisted: ScrubWorkerPersisted,
+}
+
+#[derive(Serialize, Deserialize)]
+struct ScrubWorkerPersisted {
+ tranquility: u32,
+ time_last_complete_scrub: u64,
+ corruptions_detected: u64,
+}
+
+enum ScrubWorkerState {
+ Running(BlockStoreIterator),
+ Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ Finished,
+}
+
+impl Default for ScrubWorkerState {
+ fn default() -> Self {
+ ScrubWorkerState::Finished
+ }
+}
+
+#[derive(Debug)]
+pub enum ScrubWorkerCommand {
+ Start,
+ Pause(Duration),
+ Resume,
+ Cancel,
+ SetTranquility(u32),
+}
+
+impl ScrubWorker {
+ pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
+ let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
+ let persisted = match persister.load() {
+ Ok(v) => v,
+ Err(_) => ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ tranquility: 4,
+ corruptions_detected: 0,
+ },
+ };
+ Self {
+ manager,
+ rx_cmd,
+ work: ScrubWorkerState::Finished,
+ tranquilizer: Tranquilizer::new(30),
+ persister,
+ persisted,
+ }
+ }
+
+ async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
+ match cmd {
+ ScrubWorkerCommand::Start => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Finished => {
+ let iterator = BlockStoreIterator::new(&self.manager);
+ ScrubWorkerState::Running(iterator)
+ }
+ work => {
+ error!("Cannot start scrub worker: already running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Pause(dur) => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
+ ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
+ }
+ work => {
+ error!("Cannot pause scrub worker: not running!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Resume => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
+ work => {
+ error!("Cannot resume scrub worker: not paused!");
+ work
+ }
+ };
+ }
+ ScrubWorkerCommand::Cancel => {
+ self.work = match std::mem::take(&mut self.work) {
+ ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
+ ScrubWorkerState::Finished
+ }
+ work => {
+ error!("Cannot cancel scrub worker: not running!");
+ work
+ }
+ }
+ }
+ ScrubWorkerCommand::SetTranquility(t) => {
+ self.persisted.tranquility = t;
+ if let Err(e) = self.persister.save_async(&self.persisted).await {
+ error!("Could not save new tranquilitiy value: {}", e);
+ }
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for ScrubWorker {
+ fn name(&self) -> String {
+ "Block scrub worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ let s = match &self.work {
+ ScrubWorkerState::Running(bsi) => format!(
+ "{:.2}% done (tranquility = {})",
+ bsi.progress() * 100.,
+ self.persisted.tranquility
+ ),
+ ScrubWorkerState::Paused(_bsi, rt) => {
+ format!("Paused, resumes at {}", msec_to_rfc3339(*rt))
+ }
+ ScrubWorkerState::Finished => format!(
+ "Last completed scrub: {}",
+ msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ ),
+ };
+ Some(format!(
+ "{} ; corruptions detected: {}",
+ s, self.persisted.corruptions_detected
+ ))
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.rx_cmd.try_recv() {
+ Ok(cmd) => self.handle_cmd(cmd).await,
+ Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done),
+ Err(mpsc::error::TryRecvError::Empty) => (),
+ };
+
+ match &mut self.work {
+ ScrubWorkerState::Running(bsi) => {
+ self.tranquilizer.reset();
+ if let Some(hash) = bsi.next().await? {
+ match self.manager.read_block(&hash).await {
+ Err(Error::CorruptData(_)) => {
+ error!("Found corrupt data block during scrub: {:?}", hash);
+ self.persisted.corruptions_detected += 1;
+ self.persister.save_async(&self.persisted).await?;
+ }
+ Err(e) => return Err(e),
+ _ => (),
+ };
+ Ok(self
+ .tranquilizer
+ .tranquilize_worker(self.persisted.tranquility))
+ } else {
+ self.persisted.time_last_complete_scrub = now_msec();
+ self.persister.save_async(&self.persisted).await?;
+ self.work = ScrubWorkerState::Finished;
+ self.tranquilizer.clear();
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ _ => Ok(WorkerStatus::Idle),
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ match &self.work {
+ ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
+ ScrubWorkerState::Paused(_, resume_time) => {
+ let now = now_msec();
+ if now >= *resume_time {
+ self.handle_cmd(ScrubWorkerCommand::Resume).await;
+ return WorkerStatus::Busy;
+ }
+ let delay = Duration::from_millis(*resume_time - now);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume).await,
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd).await;
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ ScrubWorkerState::Finished => {
+ let now = now_msec();
+ if now - self.persisted.time_last_complete_scrub
+ >= SCRUB_INTERVAL.as_millis() as u64
+ {
+ self.handle_cmd(ScrubWorkerCommand::Start).await;
+ return WorkerStatus::Busy;
+ }
+ let delay = SCRUB_INTERVAL
+ - Duration::from_millis(now - self.persisted.time_last_complete_scrub);
+ select! {
+ _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start).await,
+ cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
+ self.handle_cmd(cmd).await;
+ } else {
+ return WorkerStatus::Done;
+ }
+ }
+ }
+ }
+ match &self.work {
+ ScrubWorkerState::Running(_) => WorkerStatus::Busy,
+ _ => WorkerStatus::Idle,
+ }
+ }
+}
+
+// ----
+
+struct BlockStoreIterator {
+ path: Vec<ReadingDir>,
+}
+
+enum ReadingDir {
+ Pending(PathBuf),
+ Read {
+ subpaths: Vec<fs::DirEntry>,
+ pos: usize,
+ },
+}
+
+impl BlockStoreIterator {
+ fn new(manager: &BlockManager) -> Self {
+ let root_dir = manager.data_dir.clone();
+ Self {
+ path: vec![ReadingDir::Pending(root_dir)],
+ }
+ }
+
+ /// Returns progress done, between 0 and 1
+ fn progress(&self) -> f32 {
+ if self.path.is_empty() {
+ 1.0
+ } else {
+ let mut ret = 0.0;
+ let mut next_div = 1;
+ for p in self.path.iter() {
+ match p {
+ ReadingDir::Pending(_) => break,
+ ReadingDir::Read { subpaths, pos } => {
+ next_div *= subpaths.len();
+ ret += ((*pos - 1) as f32) / (next_div as f32);
+ }
+ }
+ }
+ ret
+ }
+ }
+
+ async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ loop {
+ let last_path = match self.path.last_mut() {
+ None => return Ok(None),
+ Some(lp) => lp,
+ };
+
+ if let ReadingDir::Pending(path) = last_path {
+ let mut reader = fs::read_dir(&path).await?;
+ let mut subpaths = vec![];
+ while let Some(ent) = reader.next_entry().await? {
+ subpaths.push(ent);
+ }
+ *last_path = ReadingDir::Read { subpaths, pos: 0 };
+ }
+
+ let (subpaths, pos) = match *last_path {
+ ReadingDir::Read {
+ ref subpaths,
+ ref mut pos,
+ } => (subpaths, pos),
+ ReadingDir::Pending(_) => unreachable!(),
+ };
+
+ if *pos >= subpaths.len() {
+ self.path.pop();
+ continue;
+ }
+
+ let data_dir_ent = match subpaths.get(*pos) {
+ None => {
+ self.path.pop();
+ continue;
+ }
+ Some(ent) => {
+ *pos += 1;
+ ent
+ }
+ };
+
+ let name = data_dir_ent.file_name();
+ let name = if let Ok(n) = name.into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ent_type = data_dir_ent.file_type().await?;
+
+ let name = name.strip_suffix(".zst").unwrap_or(&name);
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ let path = data_dir_ent.path();
+ self.path.push(ReadingDir::Pending(path));
+ } else if name.len() == 64 {
+ let hash_bytes = if let Ok(h) = hex::decode(&name) {
+ h
+ } else {
+ continue;
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ return Ok(Some(hash.into()));
+ }
+ }
+ }
+}
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index 6d8f64be..f697054b 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -19,7 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
-log = "0.4"
+tracing = "0.1.30"
heed = "0.11"
rusqlite = { version = "0.27", features = ["bundled"] }
diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs
index 68d96ca0..97a78b07 100644
--- a/src/db/sqlite_adapter.rs
+++ b/src/db/sqlite_adapter.rs
@@ -6,7 +6,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
-use log::trace;
+use tracing::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction};
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 640e6975..8948e750 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
+garage_block = { version = "0.7.0", path = "../block" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
@@ -31,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
bytesize = "1.1"
+timeago = "0.3"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 48914655..71ee608c 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -24,7 +24,7 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
-use crate::repair::online::OnlineRepair;
+use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
@@ -36,6 +36,7 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
+ Worker(WorkerOpt),
// Replies
Ok(String),
@@ -47,6 +48,10 @@ pub enum AdminRpc {
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
}
impl Rpc for AdminRpc {
@@ -693,15 +698,7 @@ impl AdminRpcHandler {
)))
}
} else {
- let repair = OnlineRepair {
- garage: self.garage.clone(),
- };
- self.garage
- .system
- .background
- .spawn_worker("Repair worker".into(), move |must_exit| async move {
- repair.repair_worker(opt, must_exit).await
- });
+ launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -830,6 +827,17 @@ impl AdminRpcHandler {
Ok(())
}
+
+ // ----
+
+ async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
+ match opt.cmd {
+ WorkerCmd::List { opt } => {
+ let workers = self.garage.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, opt))
+ }
+ }
+ }
}
#[async_trait]
@@ -845,6 +853,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 3a0bd956..1aa2c2ff 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,4 +1,5 @@
use std::collections::HashSet;
+use std::time::Duration;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -39,6 +40,7 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
+ Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(),
}
}
@@ -100,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
+ let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@@ -110,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
- .map(|s| format!("{}s ago", s))
+ .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@@ -182,6 +185,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
+ AdminRpc::WorkerList(wi, wlo) => {
+ print_worker_info(wi, wlo);
+ }
r => {
error!("Unexpected response: {:?}", r);
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 4f2efe19..bc44b5ef 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -45,6 +45,10 @@ pub enum Command {
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
+
+ /// Manage background workers
+ #[structopt(name = "worker")]
+ Worker(WorkerOpt),
}
#[derive(StructOpt, Debug)]
@@ -423,8 +427,29 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
- /// Tranquility factor (see tranquilizer documentation)
- #[structopt(name = "tranquility", default_value = "2")]
+ #[structopt(subcommand)]
+ cmd: ScrubCmd,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum ScrubCmd {
+ /// Start scrub
+ #[structopt(name = "start")]
+ Start,
+ /// Pause scrub (it will resume automatically after 24 hours)
+ #[structopt(name = "pause")]
+ Pause,
+ /// Resume paused scrub
+ #[structopt(name = "resume")]
+ Resume,
+ /// Cancel scrub in progress
+ #[structopt(name = "cancel")]
+ Cancel,
+ /// Set tranquility level for in-progress and future scrubs
+ #[structopt(name = "set-tranquility")]
+ SetTranquility {
+ #[structopt()]
tranquility: u32,
},
}
@@ -460,3 +485,29 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+pub struct WorkerOpt {
+ #[structopt(subcommand)]
+ pub cmd: WorkerCmd,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum WorkerCmd {
+ /// List all workers on Garage node
+ #[structopt(name = "list")]
+ List {
+ #[structopt(flatten)]
+ opt: WorkerListOpt,
+ },
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub struct WorkerListOpt {
+ /// Show only busy workers
+ #[structopt(short = "b", long = "busy")]
+ pub busy: bool,
+ /// Show only workers with errors
+ #[structopt(short = "e", long = "errors")]
+ pub errors: bool,
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 329e8a3e..8be56138 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,14 +1,19 @@
use std::collections::HashMap;
+use std::time::Duration;
+use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
use garage_util::formater::format_table;
+use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use crate::cli::structs::WorkerListOpt;
+
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -235,3 +240,56 @@ pub fn find_matching_node(
Ok(candidates[0])
}
}
+
+pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
+ let mut wi = wi.into_iter().collect::<Vec<_>>();
+ wi.sort_by_key(|(tid, info)| {
+ (
+ match info.status {
+ WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0,
+ WorkerStatus::Idle => 1,
+ WorkerStatus::Done => 2,
+ },
+ *tid,
+ )
+ });
+
+ let mut table = vec![];
+ for (tid, info) in wi.iter() {
+ if wlo.busy && !matches!(info.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) {
+ continue;
+ }
+ if wlo.errors && info.errors == 0 {
+ continue;
+ }
+
+ table.push(format!("{}\t{}\t{}", tid, info.status, info.name));
+ if let Some(i) = &info.info {
+ table.push(format!("\t\t {}", i));
+ }
+ let tf = timeago::Formatter::new();
+ let (err_ago, err_msg) = info
+ .last_error
+ .as_ref()
+ .map(|(m, t)| {
+ (
+ tf.convert(Duration::from_millis(now_msec() - t)),
+ m.as_str(),
+ )
+ })
+ .unwrap_or(("(?) ago".into(), "(?)"));
+ if info.consecutive_errors > 0 {
+ table.push(format!(
+ "\t\t {} consecutive errors ({} total), last {}",
+ info.consecutive_errors, info.errors, err_ago,
+ ));
+ table.push(format!("\t\t {}", err_msg));
+ } else if info.errors > 0 {
+ table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
+ if wlo.errors {
+ table.push(format!("\t\t {}", err_msg));
+ }
+ }
+ }
+ format_table(table);
+}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index d6a71742..eeb9cea3 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -1,89 +1,113 @@
use std::sync::Arc;
+use std::time::Duration;
+use async_trait::async_trait;
use tokio::sync::watch;
+use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_table::*;
+use garage_util::background::*;
use garage_util::error::Error;
use crate::*;
-pub struct OnlineRepair {
- pub garage: Arc<Garage>,
-}
-
-impl OnlineRepair {
- pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
- if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
- warn!("Repair worker failed with error: {}", e);
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+ match opt.what {
+ RepairWhat::Tables => {
+ info!("Launching a full sync of tables");
+ garage.bucket_table.syncer.add_full_sync();
+ garage.object_table.syncer.add_full_sync();
+ garage.version_table.syncer.add_full_sync();
+ garage.block_ref_table.syncer.add_full_sync();
+ garage.key_table.syncer.add_full_sync();
+ }
+ RepairWhat::Versions => {
+ info!("Repairing the versions table");
+ garage
+ .background
+ .spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ }
+ RepairWhat::BlockRefs => {
+ info!("Repairing the block refs table");
+ garage
+ .background
+ .spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ }
+ RepairWhat::Blocks => {
+ info!("Repairing the stored blocks");
+ garage
+ .background
+ .spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
+ }
+ RepairWhat::Scrub { cmd } => {
+ let cmd = match cmd {
+ ScrubCmd::Start => ScrubWorkerCommand::Start,
+ ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
+ ScrubCmd::Resume => ScrubWorkerCommand::Resume,
+ ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
+ ScrubCmd::SetTranquility { tranquility } => {
+ ScrubWorkerCommand::SetTranquility(tranquility)
+ }
+ };
+ info!("Sending command to scrub worker: {:?}", cmd);
+ garage.block_manager.send_scrub_command(cmd).await;
}
}
+}
- async fn repair_worker_aux(
- &self,
- opt: RepairOpt,
- must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- self.garage.bucket_table.syncer.add_full_sync();
- self.garage.object_table.syncer.add_full_sync();
- self.garage.version_table.syncer.add_full_sync();
- self.garage.block_ref_table.syncer.add_full_sync();
- self.garage.key_table.syncer.add_full_sync();
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- self.repair_versions(&must_exit).await?;
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- self.repair_block_ref(&must_exit).await?;
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- self.garage
- .block_manager
- .repair_data_store(&must_exit)
- .await?;
- }
- RepairWhat::Scrub { tranquility } => {
- info!("Verifying integrity of stored blocks");
- self.garage
- .block_manager
- .scrub_data_store(&must_exit, tranquility)
- .await?;
- }
+// ----
+
+struct RepairVersionsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
+
+impl RepairVersionsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
}
- Ok(())
}
+}
- async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
- let mut i = 0;
+#[async_trait]
+impl Worker for RepairVersionsWorker {
+ fn name(&self) -> String {
+ "Version repair worker".into()
+ }
- while !*must_exit.borrow() {
- let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? {
- Some((k, v)) => {
- pos = k;
- v
- }
- None => break,
- };
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
- i += 1;
- if i % 1000 == 0 {
- info!("repair_versions: {}", i);
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
}
-
- let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
- if version.deleted.get() {
- continue;
+ None => {
+ info!("repair_versions: finished, done {}", self.counter);
+ return Ok(WorkerStatus::Done);
}
+ };
+
+ self.counter += 1;
+
+ let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ if !version.deleted.get() {
let object = self
.garage
.object_table
@@ -109,32 +133,62 @@ impl OnlineRepair {
.await?;
}
}
- info!("repair_versions: finished, done {}", i);
- Ok(())
+
+ Ok(WorkerStatus::Busy)
}
- async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- let mut pos = vec![];
- let mut i = 0;
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
+ }
+}
- while !*must_exit.borrow() {
- let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? {
- Some((k, v)) => {
- pos = k;
- v
- }
- None => break,
- };
+// ----
- i += 1;
- if i % 1000 == 0 {
- info!("repair_block_ref: {}", i);
- }
+struct RepairBlockrefsWorker {
+ garage: Arc<Garage>,
+ pos: Vec<u8>,
+ counter: usize,
+}
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
- if block_ref.deleted.get() {
- continue;
+impl RepairBlockrefsWorker {
+ fn new(garage: Arc<Garage>) -> Self {
+ Self {
+ garage,
+ pos: vec![],
+ counter: 0,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairBlockrefsWorker {
+ fn name(&self) -> String {
+ "Block refs repair worker".into()
+ }
+
+ fn info(&self) -> Option<String> {
+ Some(format!("{} items done", self.counter))
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => {
+ self.pos = k;
+ v
}
+ None => {
+ info!("repair_block_ref: finished, done {}", self.counter);
+ return Ok(WorkerStatus::Done);
+ }
+ };
+
+ self.counter += 1;
+
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ if !block_ref.deleted.get() {
let version = self
.garage
.version_table
@@ -157,7 +211,11 @@ impl OnlineRepair {
.await?;
}
}
- info!("repair_block_ref: finished, done {}", i);
- Ok(())
+
+ Ok(WorkerStatus::Busy)
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
}
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 36e8172b..9d5aa955 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -2,8 +2,8 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
@@ -11,6 +11,7 @@ use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -171,11 +172,13 @@ impl<T: CountedItem> IndexCounter<T> {
),
});
- let this2 = this.clone();
- background.spawn_worker(
- format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
- move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
- );
+ background.spawn_worker(IndexPropagatorWorker {
+ index_counter: this.clone(),
+ propagate_rx,
+ buf: HashMap::new(),
+ errors: 0,
+ });
+
this
}
@@ -239,68 +242,6 @@ impl<T: CountedItem> IndexCounter<T> {
Ok(())
}
- async fn propagate_loop(
- self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
- must_exit: watch::Receiver<bool>,
- ) {
- // This loop batches updates to counters to be sent all at once.
- // They are sent once the propagate_rx channel has been emptied (or is closed).
- let mut buf = HashMap::new();
- let mut errors = 0;
-
- loop {
- let (ent, closed) = match propagate_rx.try_recv() {
- Ok(ent) => (Some(ent), false),
- Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
- match propagate_rx.recv().await {
- Some(ent) => (Some(ent), false),
- None => (None, true),
- }
- }
- Err(mpsc::error::TryRecvError::Empty) => (None, false),
- Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
- };
-
- if let Some((pk, sk, counters)) = ent {
- let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry(self.this_node);
- match buf.entry(tree_key) {
- hash_map::Entry::Vacant(e) => {
- e.insert(dist_entry);
- }
- hash_map::Entry::Occupied(mut e) => {
- e.get_mut().merge(&dist_entry);
- }
- }
- // As long as we can add entries, loop back and add them to batch
- // before sending batch to other nodes
- continue;
- }
-
- if !buf.is_empty() {
- let entries = buf.iter().map(|(_k, v)| v);
- if let Err(e) = self.table.insert_many(entries).await {
- errors += 1;
- if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
- break;
- }
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
- tokio::time::sleep(Duration::from_secs(5)).await;
- continue;
- }
-
- buf.clear();
- errors = 0;
- }
-
- if closed || *must_exit.borrow() {
- break;
- }
- }
- }
-
pub fn offline_recount_all<TS, TR>(
&self,
counted_table: &Arc<Table<TS, TR>>,
@@ -437,6 +378,98 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
+struct IndexPropagatorWorker<T: CountedItem> {
+ index_counter: Arc<IndexCounter<T>>,
+ propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
+
+ buf: HashMap<Vec<u8>, CounterEntry<T>>,
+ errors: usize,
+}
+
+impl<T: CountedItem> IndexPropagatorWorker<T> {
+ fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
+ let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
+ match self.buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
+ fn name(&self) -> String {
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ if !self.buf.is_empty() {
+ Some(format!("{} items in queue", self.buf.len()))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let closed = loop {
+ match self.propagate_rx.try_recv() {
+ Ok((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ }
+ Err(mpsc::error::TryRecvError::Empty) => break false,
+ Err(mpsc::error::TryRecvError::Disconnected) => break true,
+ }
+ };
+
+ if !self.buf.is_empty() {
+ let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
+ let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
+ if let Err(e) = self.index_counter.table.insert_many(entries).await {
+ self.errors += 1;
+ if self.errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
+ return Ok(WorkerStatus::Done);
+ }
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
+ } else {
+ for k in entries_k {
+ self.buf.remove(&k);
+ }
+ self.errors = 0;
+ }
+
+ return Ok(WorkerStatus::Busy);
+ } else if closed {
+ return Ok(WorkerStatus::Done);
+ } else {
+ return Ok(WorkerStatus::Idle);
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ match self.propagate_rx.recv().await {
+ Some((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ WorkerStatus::Busy
+ }
+ None => match self.buf.is_empty() {
+ false => WorkerStatus::Busy,
+ true => WorkerStatus::Done,
+ },
+ }
+ }
+}
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1d7c3ea4..77b79864 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -2,7 +2,7 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -104,6 +104,9 @@ pub struct System {
/// The job runner of this node
pub background: Arc<BackgroundRunner>,
+
+ /// Path to metadata directory (usefull)
+ pub metadata_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -295,6 +298,7 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
background,
+ metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
sys
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e7fbbcb0..0899d5e5 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -328,6 +303,69 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} GC", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.gc.data.gc_todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerStatus::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerStatus::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 7685b193..21186220 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -78,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.updater_loop_iter() {
- Ok(true) => (),
- Ok(false) => {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- }
- }
-
- fn updater_loop_iter(&self) -> Result<bool, Error> {
+ fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
- Ok(true)
+ Ok(WorkerStatus::Busy)
} else {
- Ok(false)
+ Ok(WorkerStatus::Idle)
}
}
@@ -325,6 +298,57 @@ where
}
}
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} Merkle tree updater", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.0.todo_len().unwrap_or(0);
+ if l > 0 {
+ Some(format!("{} items in queue", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ let updater = self.0.clone();
+ tokio::task::spawn_blocking(move || {
+ for _i in 0..100 {
+ let s = updater.updater_loop_iter();
+ if !matches!(s, Ok(WorkerStatus::Busy)) {
+ return s;
+ }
+ }
+ Ok(WorkerStatus::Busy)
+ })
+ .await
+ .unwrap()
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerStatus::Busy
+ }
+}
+
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4c83e991..a7e1994c 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,17 +1,17 @@
use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use futures::select;
-use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- todo: Mutex<SyncTodo>,
+ add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -52,10 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@@ -80,118 +76,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let todo = SyncTodo { todo: vec![] };
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
- todo: Mutex::new(todo),
+ add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- system.background.spawn_worker(
- format!("table sync watcher for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- system.background.spawn_worker(
- format!("table syncer for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(20)).await;
- s3.add_full_sync();
+ system.background.spawn_worker(SyncWorker {
+ syncer: syncer.clone(),
+ ring_recv: system.ring.clone(),
+ ring: system.ring.borrow().clone(),
+ add_full_sync_rx,
+ todo: vec![],
+ next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) {
- let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- select! {
- _ = ring_recv.changed().fuse() => {
- let new_ring = ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- prev_ring = new_ring.clone();
- }
- }
- busy_opt = busy_rx.recv().fuse() => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- _ = must_exit.changed().fuse() => {},
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
- }
- }
- }
- }
-
pub fn add_full_sync(&self) {
- self.todo
- .lock()
- .unwrap()
- .add_full_sync(&self.data, &self.system);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true).unwrap();
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- F::TABLE_NAME,
- partition,
- e
- );
- }
- } else {
- busy_tx.send(false).unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
+ if self.add_full_sync_tx.send(()).is_err() {
+ error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
+ // ----
+
async fn sync_partition(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -577,12 +495,22 @@ where
}
}
-impl SyncTodo {
- fn add_full_sync<F: TableSchema, R: TableReplication>(
- &mut self,
- data: &TableData<F, R>,
- system: &System,
- ) {
+// -------- Sync Worker ---------
+
+struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+ syncer: Arc<TableSyncer<F, R>>,
+ ring_recv: watch::Receiver<Arc<Ring>>,
+ ring: Arc<Ring>,
+ add_full_sync_rx: mpsc::UnboundedReceiver<()>,
+ todo: Vec<TodoPartition>,
+ next_full_sync: Instant,
+}
+
+impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+ fn add_full_sync(&mut self) {
+ let system = &self.syncer.system;
+ let data = &self.syncer.data;
+
let my_id = system.id;
self.todo.clear();
@@ -623,6 +551,8 @@ impl SyncTodo {
retain,
});
}
+
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@@ -641,6 +571,62 @@ impl SyncTodo {
}
}
+#[async_trait]
+impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} sync", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.todo.len();
+ if l > 0 {
+ Some(format!("{} partitions remaining", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
+ if let Some(partition) = self.pop_task() {
+ self.syncer.sync_partition(&partition, must_exit).await?;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Idle)
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
+ select! {
+ s = self.add_full_sync_rx.recv() => {
+ if let Some(()) = s {
+ self.add_full_sync();
+ }
+ },
+ _ = self.ring_recv.changed() => {
+ let new_ring = self.ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &self.ring) {
+ self.ring = new_ring.clone();
+ drop(new_ring);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ self.add_full_sync();
+ }
+ },
+ _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => {
+ self.add_full_sync();
+ }
+ }
+ match self.todo.is_empty() {
+ false => WorkerStatus::Busy,
+ true => WorkerStatus::Idle,
+ }
+ }
+}
+
+// ---- UTIL ----
+
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 5d073436..57c70ffb 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
+async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
diff --git a/src/util/background.rs b/src/util/background.rs
deleted file mode 100644
index d35425f5..00000000
--- a/src/util/background.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! Job runner for futures and async functions
-use core::future::Future;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::time::Duration;
-
-use futures::future::*;
-use futures::select;
-use futures::stream::FuturesUnordered;
-use futures::StreamExt;
-use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
-
-use crate::error::Error;
-
-type JobOutput = Result<(), Error>;
-type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
-/// Job runner for futures and async functions
-pub struct BackgroundRunner {
- stop_signal: watch::Receiver<bool>,
- queue_in: mpsc::UnboundedSender<(Job, bool)>,
- worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
-}
-
-impl BackgroundRunner {
- /// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
- let (worker_in, mut worker_out) = mpsc::unbounded_channel();
-
- let stop_signal_2 = stop_signal.clone();
- let await_all_done = tokio::spawn(async move {
- let mut workers = FuturesUnordered::new();
- let mut shutdown_timer = 0;
- loop {
- let closed = match worker_out.try_recv() {
- Ok(wkr) => {
- workers.push(wkr);
- false
- }
- Err(TryRecvError::Empty) => false,
- Err(TryRecvError::Disconnected) => true,
- };
- select! {
- res = workers.next() => {
- if let Some(Err(e)) = res {
- error!("Worker exited with error: {}", e);
- }
- }
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if closed || *stop_signal_2.borrow() {
- shutdown_timer += 1;
- if shutdown_timer >= 10 {
- break;
- }
- }
- }
- }
- }
- });
-
- let (queue_in, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
- let stop_signal = stop_signal.clone();
-
- worker_in
- .send(tokio::spawn(async move {
- loop {
- let (job, cancellable) = {
- select! {
- item = wait_job(&queue_out).fuse() => match item {
- // We received a task, process it
- Some(x) => x,
- // We received a signal that no more tasks will ever be sent
- // because the sending side was dropped. Exit now.
- None => break,
- },
- _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
- if *stop_signal.borrow() {
- // Nothing has been going on for 5 secs, and we are shutting
- // down. Exit now.
- break;
- } else {
- // Nothing is going on but we don't want to exit.
- continue;
- }
- }
- }
- };
- if cancellable && *stop_signal.borrow() {
- continue;
- }
- if let Err(e) = job.await {
- error!("Job failed: {}", e)
- }
- }
- info!("Background worker {} exiting", i);
- }))
- .unwrap();
- }
-
- let bgrunner = Arc::new(Self {
- stop_signal,
- queue_in,
- worker_in,
- });
- (bgrunner, await_all_done)
- }
-
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, false))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.queue_in
- .send((boxed, true))
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-
- pub fn spawn_worker<F, T>(&self, name: String, worker: F)
- where
- F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = ()> + Send + 'static,
- {
- let stop_signal = self.stop_signal.clone();
- let task = tokio::spawn(async move {
- info!("Worker started: {}", name);
- worker(stop_signal).await;
- info!("Worker exited: {}", name);
- });
- self.worker_in
- .send(task)
- .map_err(|_| "could not put job in queue")
- .unwrap();
- }
-}
-
-async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
- q.lock().await.recv().await
-}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
new file mode 100644
index 00000000..fcdac582
--- /dev/null
+++ b/src/util/background/job_worker.rs
@@ -0,0 +1,51 @@
+//! Job worker: a generic worker that just processes incoming
+//! jobs one by one
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use tokio::sync::{mpsc, Mutex};
+
+use crate::background::worker::*;
+use crate::background::*;
+
+pub(crate) struct JobWorker {
+ pub(crate) index: usize,
+ pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
+ pub(crate) next_job: Option<Job>,
+}
+
+#[async_trait]
+impl Worker for JobWorker {
+ fn name(&self) -> String {
+ format!("Job worker #{}", self.index)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.next_job.take() {
+ None => return Ok(WorkerStatus::Idle),
+ Some(job) => {
+ job.await?;
+ Ok(WorkerStatus::Busy)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ loop {
+ match self.job_chan.lock().await.recv().await {
+ Some((job, cancellable)) => {
+ if cancellable && *must_exit.borrow() {
+ continue;
+ }
+ self.next_job = Some(job);
+ return WorkerStatus::Busy;
+ }
+ None => return WorkerStatus::Done,
+ }
+ }
+ }
+}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
new file mode 100644
index 00000000..636b9c13
--- /dev/null
+++ b/src/util/background/mod.rs
@@ -0,0 +1,117 @@
+//! Job runner for futures and async functions
+
+pub mod job_worker;
+pub mod worker;
+
+use core::future::Future;
+
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch, Mutex};
+
+use crate::error::Error;
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerStatus};
+
+pub(crate) type JobOutput = Result<(), Error>;
+pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
+
+/// Job runner for futures and async functions
+pub struct BackgroundRunner {
+ send_job: mpsc::UnboundedSender<(Job, bool)>,
+ send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+#[derive(Clone, Serialize, Deserialize, Debug)]
+pub struct WorkerInfo {
+ pub name: String,
+ pub info: Option<String>,
+ pub status: WorkerStatus,
+ pub errors: usize,
+ pub consecutive_errors: usize,
+ pub last_error: Option<(String, u64)>,
+}
+
+impl BackgroundRunner {
+ /// Create a new BackgroundRunner
+ pub fn new(
+ n_runners: usize,
+ stop_signal: watch::Receiver<bool>,
+ ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
+
+ let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
+ let mut worker_processor =
+ WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
+
+ let await_all_done = tokio::spawn(async move {
+ worker_processor.run().await;
+ });
+
+ let (send_job, queue_out) = mpsc::unbounded_channel();
+ let queue_out = Arc::new(Mutex::new(queue_out));
+
+ for i in 0..n_runners {
+ let queue_out = queue_out.clone();
+
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
+ }
+
+ let bgrunner = Arc::new(Self {
+ send_job,
+ send_worker,
+ worker_info,
+ });
+ (bgrunner, await_all_done)
+ }
+
+ pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
+ self.worker_info.lock().unwrap().clone()
+ }
+
+ /// Spawn a task to be run in background
+ pub fn spawn<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, false))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ /// Spawn a task to be run in background. It may get discarded before running if spawned while
+ /// the runner is stopping
+ pub fn spawn_cancellable<T>(&self, job: T)
+ where
+ T: Future<Output = JobOutput> + Send + 'static,
+ {
+ let boxed: Job = Box::pin(job);
+ self.send_job
+ .send((boxed, true))
+ .ok()
+ .expect("Could not put job in queue");
+ }
+
+ pub fn spawn_worker<W>(&self, worker: W)
+ where
+ W: Worker + 'static,
+ {
+ self.send_worker
+ .send(Box::new(worker))
+ .ok()
+ .expect("Could not put worker in queue");
+ }
+}
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
new file mode 100644
index 00000000..7fd63c2b
--- /dev/null
+++ b/src/util/background/worker.rs
@@ -0,0 +1,261 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use async_trait::async_trait;
+use futures::future::*;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+use tokio::sync::{mpsc, watch};
+use tracing::*;
+
+use crate::background::WorkerInfo;
+use crate::error::Error;
+use crate::time::now_msec;
+
+#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
+pub enum WorkerStatus {
+ Busy,
+ Throttled(f32),
+ Idle,
+ Done,
+}
+
+impl std::fmt::Display for WorkerStatus {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ WorkerStatus::Busy => write!(f, "Busy"),
+ WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t),
+ WorkerStatus::Idle => write!(f, "Idle"),
+ WorkerStatus::Done => write!(f, "Done"),
+ }
+ }
+}
+
+#[async_trait]
+pub trait Worker: Send {
+ fn name(&self) -> String;
+
+ fn info(&self) -> Option<String> {
+ None
+ }
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again immediately.
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus;
+}
+
+pub(crate) struct WorkerProcessor {
+ stop_signal: watch::Receiver<bool>,
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+}
+
+impl WorkerProcessor {
+ pub(crate) fn new(
+ worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
+ stop_signal: watch::Receiver<bool>,
+ worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
+ ) -> Self {
+ Self {
+ stop_signal,
+ worker_chan,
+ worker_info,
+ }
+ }
+
+ pub(crate) async fn run(&mut self) {
+ let mut workers = FuturesUnordered::new();
+ let mut next_task_id = 1;
+
+ while !*self.stop_signal.borrow() {
+ let await_next_worker = async {
+ if workers.is_empty() {
+ futures::future::pending().await
+ } else {
+ workers.next().await
+ }
+ };
+ select! {
+ new_worker_opt = self.worker_chan.recv() => {
+ if let Some(new_worker) = new_worker_opt {
+ let task_id = next_task_id;
+ next_task_id += 1;
+ let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
+ let mut worker = WorkerHandler {
+ task_id,
+ stop_signal,
+ stop_signal_worker,
+ worker: new_worker,
+ status: WorkerStatus::Busy,
+ errors: 0,
+ consecutive_errors: 0,
+ last_error: None,
+ };
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ worker = await_next_worker => {
+ if let Some(mut worker) = worker {
+ trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status);
+
+ // Save worker info
+ let mut wi = self.worker_info.lock().unwrap();
+ match wi.get_mut(&worker.task_id) {
+ Some(i) => {
+ i.status = worker.status;
+ i.info = worker.worker.info();
+ i.errors = worker.errors;
+ i.consecutive_errors = worker.consecutive_errors;
+ if worker.last_error.is_some() {
+ i.last_error = worker.last_error.take();
+ }
+ }
+ None => {
+ wi.insert(worker.task_id, WorkerInfo {
+ name: worker.worker.name(),
+ status: worker.status,
+ info: worker.worker.info(),
+ errors: worker.errors,
+ consecutive_errors: worker.consecutive_errors,
+ last_error: worker.last_error.take(),
+ });
+ }
+ }
+
+ if worker.status == WorkerStatus::Done {
+ info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ } else {
+ workers.push(async move {
+ worker.step().await;
+ worker
+ }.boxed());
+ }
+ }
+ }
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+
+ // We are exiting, drain everything
+ let drain_half_time = Instant::now() + Duration::from_secs(5);
+ let drain_everything = async move {
+ while let Some(mut worker) = workers.next().await {
+ if worker.status == WorkerStatus::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status);
+ } else {
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
+ }
+ }
+ };
+
+ select! {
+ _ = drain_everything => {
+ info!("All workers exited peacefully \\o/");
+ }
+ _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
+ }
+ }
+ }
+}
+
+struct WorkerHandler {
+ task_id: usize,
+ stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
+ worker: Box<dyn Worker>,
+ status: WorkerStatus,
+ errors: usize,
+ consecutive_errors: usize,
+ last_error: Option<(String, u64)>,
+}
+
+impl WorkerHandler {
+ async fn step(&mut self) {
+ match self.status {
+ WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ self.consecutive_errors = 0;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ self.errors += 1;
+ self.consecutive_errors += 1;
+ self.last_error = Some((format!("{}", e), now_msec()));
+ // Sleep a bit so that error won't repeat immediately, exponential backoff
+ // strategy (min 1sec, max ~60sec)
+ self.status = WorkerStatus::Throttled(
+ (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
+ );
+ }
+ },
+ WorkerStatus::Throttled(delay) => {
+ // Sleep for given delay and go back to busy state
+ if !*self.stop_signal.borrow() {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
+ _ = self.stop_signal.changed() => (),
+ }
+ }
+ self.status = WorkerStatus::Busy;
+ }
+ WorkerStatus::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
+ }
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
+ }
+ }
+ }
+ WorkerStatus::Done => unreachable!(),
+ }
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 8ca6e310..fce151af 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
-//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs
index 28711387..9c796f8b 100644
--- a/src/util/tranquilizer.rs
+++ b/src/util/tranquilizer.rs
@@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
+use crate::background::WorkerStatus;
+
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@@ -33,7 +35,7 @@ impl Tranquilizer {
}
}
- pub async fn tranquilize(&mut self, tranquility: u32) {
+ fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
@@ -45,13 +47,32 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
+ Some(delay)
+ } else {
+ None
+ }
+ }
+
+ pub async fn tranquilize(&mut self, tranquility: u32) {
+ if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
+ self.reset();
}
+ }
- self.reset();
+ #[must_use]
+ pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus {
+ match self.tranquilize_internal(tranquility) {
+ Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()),
+ None => WorkerStatus::Busy,
+ }
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
+
+ pub fn clear(&mut self) {
+ self.observations.clear();
+ }
}