aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock6
-rw-r--r--Cargo.nix32
-rw-r--r--README.md28
-rw-r--r--doc/book/design/goals.md2
-rw-r--r--nix/compile.nix4
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/admin/api_server.rs7
-rw-r--r--src/api/generic_server.rs6
-rw-r--r--src/api/s3/get.rs86
-rw-r--r--src/block/manager.rs18
-rw-r--r--src/block/resync.rs14
-rw-r--r--src/garage/server.rs9
-rw-r--r--src/model/k2v/causality.rs2
-rw-r--r--src/model/k2v/item_table.rs8
-rw-r--r--src/model/k2v/rpc.rs36
-rw-r--r--src/rpc/Cargo.toml2
-rw-r--r--src/rpc/metrics.rs19
-rw-r--r--src/rpc/rpc_helper.rs71
-rw-r--r--src/rpc/system.rs16
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/table/sync.rs20
-rw-r--r--src/table/table.rs14
-rw-r--r--src/util/config.rs5
24 files changed, 216 insertions, 201 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9b3ecd5d..e0695059 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1063,6 +1063,7 @@ dependencies = [
"serde_json",
"sha2 0.10.2",
"tokio",
+ "tokio-stream",
"tracing",
"url",
]
@@ -1178,6 +1179,7 @@ dependencies = [
"garage_db",
"garage_rpc",
"garage_util",
+ "hex",
"hexdump",
"opentelemetry",
"rand 0.8.5",
@@ -2032,9 +2034,9 @@ dependencies = [
[[package]]
name = "netapp"
-version = "0.5.1"
+version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"
+checksum = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"
dependencies = [
"arc-swap",
"async-trait",
diff --git a/Cargo.nix b/Cargo.nix
index f251503b..7f729420 100644
--- a/Cargo.nix
+++ b/Cargo.nix
@@ -780,7 +780,7 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; };
dependencies = {
- ${ if hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ ${ if hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
};
});
@@ -1428,7 +1428,7 @@ in
garage_web = rustPackages."unknown".garage_web."0.8.0" { inherit profileName; };
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
- netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
+ netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
opentelemetry_otlp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; };
opentelemetry_prometheus = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; };
@@ -1506,6 +1506,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "serde_json" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.81" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "sha2" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.10.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "tokio" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; };
+ ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "tokio_stream" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.8" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "tracing" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_web" then "url" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.2.2" { inherit profileName; };
};
@@ -1599,7 +1600,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_table" else null } = rustPackages."unknown".garage_table."0.8.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.8.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
+ ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@@ -1637,7 +1638,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "k8s_openapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".k8s-openapi."0.13.1" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "kube" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kube."0.62.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
+ ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "openssl" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl."0.10.38" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "pnet_datalink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; };
@@ -1666,6 +1667,7 @@ in
garage_db = rustPackages."unknown".garage_db."0.8.0" { inherit profileName; };
garage_rpc = rustPackages."unknown".garage_rpc."0.8.0" { inherit profileName; };
garage_util = rustPackages."unknown".garage_util."0.8.0" { inherit profileName; };
+ hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
hexdump = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; };
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
@@ -1700,7 +1702,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "http" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hyper" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "lazy_static" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; };
- ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
+ ${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@@ -2742,7 +2744,7 @@ in
[ "os-poll" ]
];
dependencies = {
- ${ if hostPlatform.parsed.kernel.name == "wasi" || hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ ${ if hostPlatform.isUnix || hostPlatform.parsed.kernel.name == "wasi" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
${ if hostPlatform.isWindows then "miow" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".miow."0.3.7" { inherit profileName; };
${ if hostPlatform.isWindows then "ntapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.7" { inherit profileName; };
@@ -2821,11 +2823,11 @@ in
};
});
- "registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" = overridableMkRustCrate (profileName: rec {
+ "registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" = overridableMkRustCrate (profileName: rec {
name = "netapp";
- version = "0.5.1";
+ version = "0.5.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
- src = fetchCratesIo { inherit name version; sha256 = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"; };
+ src = fetchCratesIo { inherit name version; sha256 = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"; };
features = builtins.concatLists [
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default")
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry")
@@ -3886,7 +3888,7 @@ in
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
- ${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
@@ -4496,7 +4498,7 @@ in
];
dependencies = {
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
- ${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
+ ${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
@@ -5596,11 +5598,11 @@ in
[ "default" ]
];
dependencies = {
- ${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
- ${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
- ${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
- ${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
+ ${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
};
});
diff --git a/README.md b/README.md
index ec7e2485..9992fff2 100644
--- a/README.md
+++ b/README.md
@@ -15,18 +15,24 @@ Garage [![Build Status](https://drone.deuxfleurs.fr/api/badges/Deuxfleurs/garage
]
</p>
-Garage is a lightweight S3-compatible distributed object store, with the following goals:
+Garage is an S3-compatible distributed object storage service
+designed for self-hosting at a small-to-medium scale.
-- As self-contained as possible
-- Easy to set up
-- Highly resilient to network failures, network latency, disk failures, sysadmin failures
-- Relatively simple
-- Made for multi-datacenter deployments
+Garage is designed for storage clusters composed of nodes running
+at different physical locations,
+in order to easily provide a storage service that replicates data at these different
+locations and stays available even when some servers are unreachable.
+Garage also focuses on being lightweight, easy to operate, and highly resilient to
+machine failures.
-Non-goals include:
+Garage is built by [Deuxfleurs](https://deuxfleurs.fr),
+an experimental small-scale self hosted service provider,
+which has been using it in production since its first release in 2020.
-- Extremely high performance
-- Complete implementation of the S3 API
-- Erasure coding (our replication model is simply to copy the data as is on several nodes, in different datacenters if possible)
+Learn more on our dedicated documentation pages:
-Our main use case is to provide a distributed storage layer for small-scale self hosted services such as [Deuxfleurs](https://deuxfleurs.fr).
+- [Goals and use cases](https://garagehq.deuxfleurs.fr/documentation/design/goals/)
+- [Features](https://garagehq.deuxfleurs.fr/documentation/reference-manual/features/)
+- [Quick start](https://garagehq.deuxfleurs.fr/documentation/quick-start/)
+
+Garage is entirely free software released under the terms of the AGPLv3.
diff --git a/doc/book/design/goals.md b/doc/book/design/goals.md
index b97d73a9..9c2d89f0 100644
--- a/doc/book/design/goals.md
+++ b/doc/book/design/goals.md
@@ -14,10 +14,10 @@ website.
Garage is an opinionated object storage solutoin, we focus on the following **desirable properties**:
+ - **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
- **Self-contained & lightweight**: works everywhere and integrates well in existing environments to target [hyperconverged infrastructures](https://en.wikipedia.org/wiki/Hyper-converged_infrastructure).
- **Highly resilient**: highly resilient to network failures, network latency, disk failures, sysadmin failures.
- **Simple**: simple to understand, simple to operate, simple to debug.
- - **Internet enabled**: made for multi-sites (eg. datacenters, offices, households, etc.) interconnected through regular Internet connections.
We also noted that the pursuit of some other goals are detrimental to our initial goals.
The following has been identified as **non-goals** (if these points matter to you, you should not use Garage):
diff --git a/nix/compile.nix b/nix/compile.nix
index 512a7354..4382a2be 100644
--- a/nix/compile.nix
+++ b/nix/compile.nix
@@ -137,8 +137,8 @@ let
/* [2] */ hardeningDisable = [ "pie" ];
};
overrideArgs = old: {
- /* [4] */ features = [ "bundled-libs" "sled" ]
- ++ (if release then [ "kubernetes-discovery" "telemetry-otlp" "metrics" "lmdb" "sqlite" ] else []);
+ /* [4] */ features = [ "bundled-libs" "sled" "metrics" ]
+ ++ (if release then [ "kubernetes-discovery" "telemetry-otlp" "lmdb" "sqlite" ] else []);
};
})
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index cdfabcb8..7c3ed43b 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -38,6 +38,7 @@ futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+tokio-stream = "0.1"
form_urlencoded = "1.0.0"
http = "0.2"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index fb0078cc..0816bda1 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -34,7 +34,10 @@ pub struct AdminApiServer {
}
impl AdminApiServer {
- pub fn new(garage: Arc<Garage>) -> Self {
+ pub fn new(
+ garage: Arc<Garage>,
+ #[cfg(feature = "metrics")] exporter: PrometheusExporter,
+ ) -> Self {
let cfg = &garage.config.admin;
let metrics_token = cfg
.metrics_token
@@ -47,7 +50,7 @@ impl AdminApiServer {
Self {
garage,
#[cfg(feature = "metrics")]
- exporter: opentelemetry_prometheus::exporter().init(),
+ exporter,
metrics_token,
admin_token,
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index a48be1bc..62fe4e5a 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -174,7 +174,11 @@ impl<A: ApiHandler> ApiServer<A> {
let current_context = Context::current();
let current_span = current_context.span();
- current_span.update_name::<String>(format!("S3 API {}", endpoint.name()));
+ current_span.update_name::<String>(format!(
+ "{} API {}",
+ A::API_NAME_DISPLAY,
+ endpoint.name()
+ ));
current_span.set_attribute(KeyValue::new("endpoint", endpoint.name()));
endpoint.add_span_attributes(current_span);
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index ae4c287d..2a99551a 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -2,16 +2,19 @@
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
-use futures::stream::*;
+use futures::future;
+use futures::stream::{self, StreamExt};
use http::header::{
ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, IF_MODIFIED_SINCE,
IF_NONE_MATCH, LAST_MODIFIED, RANGE,
};
use hyper::{Body, Request, Response, StatusCode};
+use tokio::sync::mpsc;
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
use garage_table::EmptyKey;
use garage_util::data::*;
+use garage_util::error::OkOrMessage;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
@@ -242,43 +245,56 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let order_stream = OrderTag::stream();
-
- let read_first_block = garage
- .block_manager
- .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
- let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
+ let (tx, rx) = mpsc::channel(2);
- let (first_block_stream, version) =
- futures::try_join!(read_first_block, get_next_blocks)?;
- let version = version.ok_or(Error::NoSuchKey)?;
+ let order_stream = OrderTag::stream();
+ let first_block_hash = *first_block_hash;
+ let version_uuid = last_v.uuid;
+
+ tokio::spawn(async move {
+ match async {
+ let garage2 = garage.clone();
+ let version_fut = tokio::spawn(async move {
+ garage2.version_table.get(&version_uuid, &EmptyKey).await
+ });
+
+ let stream_block_0 = garage
+ .block_manager
+ .rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
+ .await?;
+ tx.send(stream_block_0)
+ .await
+ .ok_or_message("channel closed")?;
+
+ let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
+ for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
+ let stream_block_i = garage
+ .block_manager
+ .rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
+ .await?;
+ tx.send(stream_block_i)
+ .await
+ .ok_or_message("channel closed")?;
+ }
- let mut blocks = version
- .blocks
- .items()
- .iter()
- .map(|(_, vb)| (vb.hash, None))
- .collect::<Vec<_>>();
- blocks[0].1 = Some(first_block_stream);
-
- let body_stream = futures::stream::iter(blocks)
- .enumerate()
- .map(move |(i, (hash, stream_opt))| {
- let garage = garage.clone();
- async move {
- if let Some(stream) = stream_opt {
- stream
- } else {
- garage
- .block_manager
- .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
- .await
- .unwrap_or_else(|e| error_stream(i, e))
- }
+ Ok::<(), Error>(())
+ }
+ .await
+ {
+ Ok(()) => (),
+ Err(e) => {
+ let err = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Error while getting object data: {}", e),
+ );
+ let _ = tx
+ .send(Box::pin(stream::once(future::ready(Err(err)))))
+ .await;
}
- })
- .buffered(2)
- .flatten();
+ }
+ });
+
+ let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
diff --git a/src/block/manager.rs b/src/block/manager.rs
index ec694fc8..7f439b96 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -41,9 +41,6 @@ use crate::resync::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
-// Timeout for RPCs that read and write blocks to remote nodes
-pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
-
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
@@ -183,7 +180,7 @@ impl BlockManager {
};
return Ok((header, stream));
}
- _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@@ -235,7 +232,7 @@ impl BlockManager {
}
}
}
- _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@@ -300,8 +297,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
- .with_quorum(self.replication.write_quorum())
- .with_timeout(BLOCK_RW_TIMEOUT),
+ .with_quorum(self.replication.write_quorum()),
)
.await?;
@@ -336,7 +332,10 @@ impl BlockManager {
// we will fecth it from someone.
let this = self.clone();
tokio::spawn(async move {
- if let Err(e) = this.resync.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
+ if let Err(e) = this
+ .resync
+ .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ {
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
});
@@ -444,7 +443,8 @@ impl BlockManager {
Ok(c) => c,
Err(e) => {
// Not found but maybe we should have had it ??
- self.resync.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
+ self.resync
+ .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Into::into(e));
}
};
diff --git a/src/block/resync.rs b/src/block/resync.rs
index bde3e98c..ada3ac54 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -33,14 +33,6 @@ use garage_table::replication::TableReplication;
use crate::manager::*;
-// Timeout for RPCs that ask other nodes whether they need a copy
-// of a given block before we delete it locally
-// The timeout here is relatively low because we don't want to block
-// the entire resync loop when some nodes are not responding.
-// Nothing will be deleted if the nodes don't answer the queries,
-// we will just retry later.
-const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
-
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
@@ -346,8 +338,7 @@ impl BlockResyncManager {
&manager.endpoint,
&who,
BlockRpc::NeedBlockQuery(*hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -394,8 +385,7 @@ impl BlockResyncManager {
&need_nodes[..],
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(need_nodes.len())
- .with_timeout(BLOCK_RW_TIMEOUT),
+ .with_quorum(need_nodes.len()),
)
.await
.err_context("PutBlock RPC")?;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index aeef02a2..28710a8e 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -32,6 +32,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// ---- Initialize Garage internals ----
+ #[cfg(feature = "metrics")]
+ let metrics_exporter = opentelemetry_prometheus::exporter().init();
+
info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c();
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
@@ -50,7 +53,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
}
info!("Initialize Admin API server and metrics collector...");
- let admin_server = AdminApiServer::new(garage.clone());
+ let admin_server = AdminApiServer::new(
+ garage.clone(),
+ #[cfg(feature = "metrics")]
+ metrics_exporter,
+ );
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 8c76a32b..9a692870 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>,
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index baa1db4b..7860cb17 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
pub sort_key: String,
@@ -25,19 +25,19 @@ pub struct K2VItem {
items: BTreeMap<K2VNodeId, DvvsEntry>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
pub struct K2VItemPartition {
pub bucket_id: Uuid,
pub partition_key: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
struct DvvsEntry {
t_discard: u64,
values: Vec<(u64, DvvsValue)>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum DvvsValue {
Value(#[serde(with = "serde_bytes")] Vec<u8>),
Deleted,
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 90101d0f..a74df277 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -205,22 +202,23 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollItem {
- key: poll_key,
- causal_context,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
- )
- .await?;
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
let mut resp: Option<K2VItem> = None;
for v in resps {
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index e51f1f73..d61acea4 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-netapp = { version = "0.5.1", features = ["telemetry"] }
+netapp = { version = "0.5.2", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs
index c900518c..61f8fa79 100644
--- a/src/rpc/metrics.rs
+++ b/src/rpc/metrics.rs
@@ -1,31 +1,18 @@
-use std::sync::Arc;
-
use opentelemetry::{global, metrics::*};
-use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
- pub(crate) _rpc_available_permits: ValueObserver<u64>,
-
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
- pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
- pub fn new(sem: Arc<Semaphore>) -> Self {
+ pub fn new() -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
- _rpc_available_permits: meter
- .u64_value_observer("rpc.available_permits", move |observer| {
- observer.observe(sem.available_permits() as u64, &[])
- })
- .with_description("Number of available RPC permits")
- .init(),
-
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
@@ -46,10 +33,6 @@ impl RpcMetrics {
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
- rpc_queueing_time: meter
- .f64_value_recorder("rpc.queueing_time")
- .with_description("Time RPC requests were queued for before being sent")
- .init(),
}
}
}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 19abb4c5..949aced6 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
-use tokio::sync::{watch, Semaphore};
+use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Don't allow more than 100 concurrent outgoing RPCs.
-const MAX_CONCURRENT_REQUESTS: usize = 100;
+// Default RPC timeout = 5 minutes
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
- /// Max time to wait for reponse
- pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
+ /// Custom timeout for this request
+ rs_timeout: Timeout,
+}
+
+#[derive(Copy, Clone)]
+enum Timeout {
+ None,
+ Default,
+ Custom(Duration),
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
- rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
+ rs_timeout: Timeout::Default,
}
}
/// Set quorum to be reached for request
@@ -65,17 +70,22 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
- /// Set timeout of the strategy
- pub fn with_timeout(mut self, timeout: Duration) -> Self {
- self.rs_timeout = timeout;
- self
- }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
+ /// Deactivate timeout for this request
+ pub fn without_timeout(mut self) -> Self {
+ self.rs_timeout = Timeout::None;
+ self
+ }
+ /// Set custom timeout for this request
+ pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = Timeout::Custom(timeout);
+ self
+ }
}
#[derive(Clone)]
@@ -86,8 +96,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
- request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
+ rpc_timeout: Duration,
}
impl RpcHelper {
@@ -96,21 +106,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
+ rpc_timeout: Option<Duration>,
) -> Self {
- let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
-
- let metrics = RpcMetrics::new(sem.clone());
+ let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
- request_buffer_semaphore: sem,
metrics,
+ rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
+ pub fn rpc_timeout(&self) -> Duration {
+ self.0.rpc_timeout
+ }
+
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@@ -129,13 +142,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let permit = self
- .0
- .request_buffer_semaphore
- .acquire()
- .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
- .await?;
-
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@@ -143,10 +149,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
+ let timeout = async {
+ match strat.rs_timeout {
+ Timeout::None => futures::future::pending().await,
+ Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
+ Timeout::Custom(t) => tokio::time::sleep(t).await,
+ }
+ };
+
select! {
res = rpc_call => {
- drop(permit);
-
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@@ -158,8 +170,7 @@ impl RpcHelper {
Ok(res?)
}
- _ = tokio::time::sleep(strat.rs_timeout) => {
- drop(permit);
+ () = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
@@ -413,7 +424,7 @@ impl RpcHelper {
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
+ .unwrap_or_else(|| Duration::from_secs(10));
(
*to != self.0.our_node_id,
peer_zone != our_zone,
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6136a8..f8121193 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -37,7 +37,6 @@ use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
-const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
@@ -280,6 +279,9 @@ impl System {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
+ if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
+ fullmesh.set_ping_timeout_millis(ping_timeout);
+ }
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@@ -317,7 +319,13 @@ impl System {
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()),
+ rpc: RpcHelper::new(
+ netapp.id.into(),
+ fullmesh,
+ background.clone(),
+ ring.clone(),
+ config.rpc_timeout_msec.map(Duration::from_millis),
+ ),
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
@@ -600,7 +608,7 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
@@ -724,7 +732,7 @@ impl System {
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ae52e8d7..38c6b41c 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -22,6 +22,7 @@ opentelemetry = "0.17"
async-trait = "0.1.7"
bytes = "1.0"
+hex = "0.4"
hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 6cae9701..83e7eeff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -25,8 +25,6 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
-// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager
-const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
@@ -237,9 +235,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
@@ -260,9 +256,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 62b88a58..e34aa8d7 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -24,9 +24,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
-// Sync RPC can contain a lot of data, so have a 1min timeout
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60);
-
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
@@ -248,9 +245,7 @@ where
&self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await?;
@@ -311,8 +306,7 @@ where
&self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -351,11 +345,11 @@ where
// Just send that item directly
if let Some(val) = self.data.store.get(&ik[..])? {
if blake2sum(&val[..]) != ivhash {
- warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
todo_items.push(val.to_vec());
} else {
- warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
}
MerkleNode::Intermediate(l) => {
@@ -368,8 +362,7 @@ where
&self.endpoint,
who,
SyncRpc::GetNode(key.clone()),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?
{
@@ -445,8 +438,7 @@ where
&self.endpoint,
who,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
if let SyncRpc::Ok = rpc_resp {
diff --git a/src/table/table.rs b/src/table/table.rs
index 8e801be6..8a66c420 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -1,7 +1,6 @@
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@@ -31,8 +30,6 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
@@ -124,8 +121,7 @@ where
&who[..],
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.data.replication.write_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ .with_quorum(self.data.replication.write_quorum()),
)
.await?;
@@ -177,7 +173,7 @@ where
&self.endpoint,
node,
rpc,
- RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL),
)
.await?;
Ok::<_, Error>((node, resp))
@@ -234,7 +230,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -329,7 +324,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -406,9 +400,7 @@ where
&self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(who.len())
- .with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
)
.await?;
Ok(())
diff --git a/src/util/config.rs b/src/util/config.rs
index 5e113e13..2d4b4f57 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -41,6 +41,11 @@ pub struct Config {
/// Public IP address of this node
pub rpc_public_addr: Option<String>,
+ /// Timeout for Netapp's ping messagess
+ pub rpc_ping_timeout_msec: Option<u64>,
+ /// Timeout for Netapp RPC calls
+ pub rpc_timeout_msec: Option<u64>,
+
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,