aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml26
-rw-r--r--src/api/admin/api_server.rs56
-rw-r--r--src/api/admin/bucket.rs4
-rw-r--r--src/api/admin/cluster.rs2
-rw-r--r--src/api/admin/router.rs3
-rw-r--r--src/api/generic_server.rs16
-rw-r--r--src/api/k2v/api_server.rs3
-rw-r--r--src/api/k2v/batch.rs96
-rw-r--r--src/api/k2v/error.rs2
-rw-r--r--src/api/k2v/index.rs9
-rw-r--r--src/api/k2v/item.rs19
-rw-r--r--src/api/k2v/router.rs8
-rw-r--r--src/api/s3/bucket.rs2
-rw-r--r--src/api/s3/error.rs2
-rw-r--r--src/api/s3/list.rs11
-rw-r--r--src/api/s3/post_object.rs5
-rw-r--r--src/api/s3/put.rs96
-rw-r--r--src/api/signature/error.rs2
-rw-r--r--src/block/Cargo.toml14
-rw-r--r--src/block/manager.rs79
-rw-r--r--src/block/metrics.rs17
-rw-r--r--src/block/rc.rs14
-rw-r--r--src/block/repair.rs163
-rw-r--r--src/block/resync.rs108
-rw-r--r--src/db/Cargo.toml10
-rw-r--r--src/garage/Cargo.toml26
-rw-r--r--src/garage/admin.rs249
-rw-r--r--src/garage/cli/cmd.rs18
-rw-r--r--src/garage/cli/init.rs4
-rw-r--r--src/garage/cli/structs.rs36
-rw-r--r--src/garage/cli/util.rs10
-rw-r--r--src/garage/main.rs48
-rw-r--r--src/garage/repair/offline.rs9
-rw-r--r--src/garage/repair/online.rs6
-rw-r--r--src/garage/server.rs5
-rw-r--r--src/garage/tests/admin.rs27
-rw-r--r--src/garage/tests/bucket.rs4
-rw-r--r--src/garage/tests/common/client.rs12
-rw-r--r--src/garage/tests/common/custom_requester.rs9
-rw-r--r--src/garage/tests/common/garage.rs49
-rw-r--r--src/garage/tests/common/mod.rs13
-rw-r--r--src/garage/tests/k2v/batch.rs103
-rw-r--r--src/garage/tests/k2v/item.rs37
-rw-r--r--src/garage/tests/k2v/poll.rs173
-rw-r--r--src/garage/tests/s3/streaming_signature.rs2
-rw-r--r--src/garage/tests/s3/website.rs173
-rw-r--r--src/k2v-client/Cargo.toml19
-rw-r--r--src/k2v-client/bin/k2v-cli.rs203
-rw-r--r--src/k2v-client/lib.rs89
-rw-r--r--src/model/Cargo.toml20
-rw-r--r--src/model/garage.rs45
-rw-r--r--src/model/k2v/causality.rs65
-rw-r--r--src/model/k2v/item_table.rs17
-rw-r--r--src/model/k2v/mod.rs4
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs275
-rw-r--r--src/model/k2v/seen.rs105
-rw-r--r--src/model/k2v/sub.rs110
-rw-r--r--src/rpc/Cargo.toml10
-rw-r--r--src/rpc/consul.rs2
-rw-r--r--src/rpc/lib.rs4
-rw-r--r--src/rpc/rpc_helper.rs5
-rw-r--r--src/rpc/system.rs104
-rw-r--r--src/rpc/system_metrics.rs77
-rw-r--r--src/table/Cargo.toml10
-rw-r--r--src/table/data.rs24
-rw-r--r--src/util/Cargo.toml15
-rw-r--r--src/util/background/mod.rs1
-rw-r--r--src/util/background/vars.rs113
-rw-r--r--src/util/build.rs8
-rw-r--r--src/util/config.rs176
-rw-r--r--src/util/data.rs4
-rw-r--r--src/util/forwarded_headers.rs63
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/persister.rs34
-rw-r--r--src/util/time.rs2
-rw-r--r--src/util/token_bucket.rs40
-rw-r--r--src/util/version.rs4
-rw-r--r--src/web/Cargo.toml12
-rw-r--r--src/web/web_server.rs16
80 files changed, 2801 insertions, 707 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index dba0bbef..9babec02 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_api"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,35 +14,35 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.8.1", path = "../model" }
-garage_table = { version = "0.8.1", path = "../table" }
-garage_block = { version = "0.8.1", path = "../block" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
+garage_model = { version = "0.8.2", path = "../model" }
+garage_table = { version = "0.8.2", path = "../table" }
+garage_block = { version = "0.8.2", path = "../block" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
async-trait = "0.1.7"
-base64 = "0.13"
+base64 = "0.21"
bytes = "1.0"
chrono = "0.4"
crypto-common = "0.1"
err-derive = "0.3"
hex = "0.4"
hmac = "0.12"
-idna = "0.2"
-tracing = "0.1.30"
+idna = "0.3"
+tracing = "0.1"
md-5 = "0.10"
nom = "7.1"
sha2 = "0.10"
futures = "0.3"
futures-util = "0.3"
-pin-project = "1.0.11"
+pin-project = "1.0.12"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-stream = "0.1"
form_urlencoded = "1.0.0"
http = "0.2"
-httpdate = "0.3"
+httpdate = "1.0"
http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
multer = "2.0"
@@ -51,8 +51,8 @@ roxmltree = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0"
-quick-xml = { version = "0.21", features = [ "serialize" ] }
-url = "2.1"
+quick-xml = { version = "0.26", features = [ "serialize" ] }
+url = "2.3"
opentelemetry = "0.17"
opentelemetry-prometheus = { version = "0.10", optional = true }
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 2d325fb1..58dd38d8 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -77,6 +78,60 @@ impl AdminApiServer {
.body(Body::empty())?)
}
+ async fn handle_check_website_enabled(
+ &self,
+ req: Request<Body>,
+ ) -> Result<Response<Body>, Error> {
+ let query_params: HashMap<String, String> = req
+ .uri()
+ .query()
+ .map(|v| {
+ url::form_urlencoded::parse(v.as_bytes())
+ .into_owned()
+ .collect()
+ })
+ .unwrap_or_else(HashMap::new);
+
+ let has_domain_key = query_params.contains_key("domain");
+
+ if !has_domain_key {
+ return Err(Error::bad_request("No domain query string found"));
+ }
+
+ let domain = query_params
+ .get("domain")
+ .ok_or_internal_error("Could not parse domain query string")?;
+
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&domain)
+ .await?
+ .ok_or(HelperError::NoSuchBucket(domain.to_string()))?;
+
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let bucket_state = bucket.state.as_option().unwrap();
+ let bucket_website_config = bucket_state.website_config.get();
+
+ match bucket_website_config {
+ Some(_v) => {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(format!(
+ "Bucket '{domain}' is authorized for website hosting"
+ )))?)
+ }
+ None => Err(Error::bad_request(format!(
+ "Bucket '{domain}' is not authorized for website hosting"
+ ))),
+ }
+ }
+
fn handle_health(&self) -> Result<Response<Body>, Error> {
let health = self.garage.system.health();
@@ -174,6 +229,7 @@ impl ApiHandler for AdminApiServer {
match endpoint {
Endpoint::Options => self.handle_options(&req),
+ Endpoint::CheckWebsiteEnabled => self.handle_check_website_enabled(req).await,
Endpoint::Health => self.handle_health(),
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index 65034852..e60f07ca 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -167,7 +167,7 @@ async fn bucket_info_results(
let quotas = state.quotas.get();
let res =
GetBucketInfoResult {
- id: hex::encode(&bucket.id),
+ id: hex::encode(bucket.id),
global_aliases: state
.aliases
.items()
@@ -575,6 +575,6 @@ pub async fn handle_local_unalias_bucket(
// ---- HELPER ----
fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
- let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?;
+ let id_hex = hex::decode(id).ok_or_bad_request("Invalid bucket id")?;
Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
}
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 540c6009..b2508d2e 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -20,6 +20,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
node: hex::encode(garage.system.id),
garage_version: garage_util::version::garage_version(),
garage_features: garage_util::version::garage_features(),
+ rust_version: garage_util::version::rust_version(),
db_engine: garage.db.engine(),
known_nodes: garage
.system
@@ -106,6 +107,7 @@ struct GetClusterStatusResponse {
node: String,
garage_version: &'static str,
garage_features: Option<&'static [&'static str]>,
+ rust_version: &'static str,
db_engine: String,
known_nodes: HashMap<String, KnownNodeResp>,
layout: GetClusterLayoutResponse,
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index 62e6abc3..0dcb1546 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -17,6 +17,7 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
Options,
+ CheckWebsiteEnabled,
Health,
Metrics,
GetClusterStatus,
@@ -91,6 +92,7 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options,
+ GET "/check" => CheckWebsiteEnabled,
GET "/health" => Health,
GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus,
@@ -136,6 +138,7 @@ impl Endpoint {
pub fn authorization_type(&self) -> Authorization {
match self {
Self::Health => Authorization::None,
+ Self::CheckWebsiteEnabled => Authorization::None,
Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken,
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index 62fe4e5a..d0354d28 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -19,6 +19,7 @@ use opentelemetry::{
};
use garage_util::error::Error as GarageError;
+use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
@@ -125,7 +126,20 @@ impl<A: ApiHandler> ApiServer<A> {
addr: SocketAddr,
) -> Result<Response<Body>, GarageError> {
let uri = req.uri().clone();
- info!("{} {} {}", addr, req.method(), uri);
+
+ if let Ok(forwarded_for_ip_addr) =
+ forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ {
+ info!(
+ "{} (via {}) {} {}",
+ forwarded_for_ip_addr,
+ addr,
+ req.method(),
+ uri
+ );
+ } else {
+ info!("{} {} {}", addr, req.method(), uri);
+ }
debug!("{:?}", req);
let tracer = opentelemetry::global::tracer("garage");
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 084867b5..bb85b2e7 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer {
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::PollRange { partition_key } => {
+ handle_poll_range(garage, bucket_id, &partition_key, req).await
+ }
Endpoint::Options => unreachable!(),
};
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 78035362..26d678da 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -1,10 +1,10 @@
use std::sync::Arc;
+use base64::prelude::*;
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@@ -25,15 +25,13 @@ pub async fn handle_insert_batch(
let mut items2 = vec![];
for it in items {
- let ct = it
- .ct
- .map(|s| CausalContext::parse(&s))
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
let v = match it.v {
- Some(vs) => {
- DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
- }
+ Some(vs) => DvvsValue::Value(
+ BASE64_STANDARD
+ .decode(vs)
+ .ok_or_bad_request("Invalid base64 value")?,
+ ),
None => DvvsValue::Deleted,
};
items2.push((it.pk, it.sk, ct, v));
@@ -65,10 +63,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@@ -160,10 +155,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@@ -257,6 +249,53 @@ async fn handle_delete_batch_query(
})
}
+pub(crate) async fn handle_poll_range(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ use garage_model::k2v::sub::PollRange;
+
+ let query = parse_json_body::<PollRangeQuery>(req).await?;
+
+ let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
+
+ let resp = garage
+ .k2v
+ .rpc
+ .poll_range(
+ PollRange {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ start: query.start,
+ end: query.end,
+ prefix: query.prefix,
+ },
+ query.seen_marker,
+ timeout_msec,
+ )
+ .await?;
+
+ if let Some((items, seen_marker)) = resp {
+ let resp = PollRangeResponse {
+ items: items
+ .into_iter()
+ .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .collect::<Vec<_>>(),
+ seen_marker,
+ };
+
+ Ok(json_ok_response(&resp)?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
+
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@@ -322,7 +361,7 @@ impl ReadBatchResponseItem {
.values()
.iter()
.map(|v| match v {
- DvvsValue::Value(x) => Some(base64::encode(x)),
+ DvvsValue::Value(x) => Some(BASE64_STANDARD.encode(x)),
DvvsValue::Deleted => None,
})
.collect::<Vec<_>>();
@@ -361,3 +400,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
+
+#[derive(Deserialize)]
+struct PollRangeQuery {
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ timeout: Option<u64>,
+ #[serde(default, rename = "seenMarker")]
+ seen_marker: Option<String>,
+}
+
+#[derive(Serialize)]
+struct PollRangeResponse {
+ items: Vec<ReadBatchResponseItem>,
+ #[serde(rename = "seenMarker")]
+ seen_marker: String,
+}
diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs
index 42491466..4eb017ab 100644
--- a/src/api/k2v/error.rs
+++ b/src/api/k2v/error.rs
@@ -19,7 +19,7 @@ pub enum Error {
// Category: cannot process
/// Authorization Header Malformed
- #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ #[error(display = "Authorization header malformed, unexpected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
/// The object requested don't exists
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 210950bf..6c1d4a91 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,10 +1,9 @@
use std::sync::Arc;
-use hyper::{Body, Response, StatusCode};
+use hyper::{Body, Response};
use serde::Serialize;
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_rpc::ring::Ring;
use garage_table::util::*;
@@ -12,6 +11,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+use crate::helpers::*;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@@ -68,10 +68,7 @@ pub async fn handle_read_index(
next_start,
};
- let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resp)?)
}
#[derive(Serialize)]
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index f85138c7..e13a0f30 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -1,5 +1,6 @@
use std::sync::Arc;
+use base64::prelude::*;
use http::header;
use hyper::{Body, Request, Response, StatusCode};
@@ -81,7 +82,7 @@ impl ReturnFormat {
.iter()
.map(|v| match v {
DvvsValue::Deleted => serde_json::Value::Null,
- DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)),
+ DvvsValue::Value(v) => serde_json::Value::String(BASE64_STANDARD.encode(v)),
})
.collect::<Vec<_>>();
let json_body =
@@ -133,9 +134,8 @@ pub async fn handle_insert_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec());
@@ -169,9 +169,8 @@ pub async fn handle_delete_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let value = DvvsValue::Deleted;
@@ -208,15 +207,17 @@ pub async fn handle_poll_item(
let causal_context =
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+ let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000;
+
let item = garage
.k2v
.rpc
- .poll(
+ .poll_item(
bucket_id,
partition_key,
sort_key,
causal_context,
- timeout_secs.unwrap_or(300) * 1000,
+ timeout_msec,
)
.await?;
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index e7a3dd69..1cc58be5 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -32,6 +32,9 @@ pub enum Endpoint {
causality_token: String,
timeout: Option<u64>,
},
+ PollRange {
+ partition_key: String,
+ },
ReadBatch {
},
ReadIndex {
@@ -113,6 +116,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
+ POLL_RANGE => PollRange,
],
no_key: [
EMPTY => ReadBatch,
@@ -142,6 +146,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
+ POLL_RANGE => PollRange,
],
no_key: [
EMPTY => InsertBatch,
@@ -234,7 +239,8 @@ impl Endpoint {
generateQueryParameters! {
keywords: [
"delete" => DELETE,
- "search" => SEARCH
+ "search" => SEARCH,
+ "poll_range" => POLL_RANGE
],
fields: [
"prefix" => prefix,
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 8471385f..733981e1 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
let mut ret = None;
for item in cbc.children() {
if item.has_tag_name("LocationConstraint") {
- if ret != None {
+ if ret.is_some() {
return None;
}
ret = Some(item.text()?.to_string());
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index 67009d63..c50cff9f 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -21,7 +21,7 @@ pub enum Error {
// Category: cannot process
/// Authorization Header Malformed
- #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ #[error(display = "Authorization header malformed, unexpected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
/// The object requested don't exists
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index e5f486c8..5cb0d65a 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;
+use base64::prelude::*;
use hyper::{Body, Response};
use garage_util::data::*;
@@ -129,11 +130,11 @@ pub async fn handle_list(
next_continuation_token: match (query.is_v2, &pagination) {
(true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
"]{}",
- base64::encode(key.as_bytes())
+ BASE64_STANDARD.encode(key.as_bytes())
))),
(true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
"[{}",
- base64::encode(key.as_bytes())
+ BASE64_STANDARD.encode(key.as_bytes())
))),
_ => None,
},
@@ -583,14 +584,16 @@ impl ListObjectsQuery {
(Some(token), _) => match &token[..1] {
"[" => Ok(RangeBegin::IncludingKey {
key: String::from_utf8(
- base64::decode(token[1..].as_bytes())
+ BASE64_STANDARD
+ .decode(token[1..].as_bytes())
.ok_or_bad_request("Invalid continuation token")?,
)?,
fallback_key: None,
}),
"]" => Ok(RangeBegin::AfterKey {
key: String::from_utf8(
- base64::decode(token[1..].as_bytes())
+ BASE64_STANDARD
+ .decode(token[1..].as_bytes())
.ok_or_bad_request("Invalid continuation token")?,
)?,
}),
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index d063faa4..f2098ab0 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -4,6 +4,7 @@ use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
+use base64::prelude::*;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
@@ -138,7 +139,9 @@ pub async fn handle_post_object(
.get_existing_bucket(bucket_id)
.await?;
- let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
+ let decoded_policy = BASE64_STANDARD
+ .decode(policy)
+ .ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 97b8e4e3..350ab884 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
+use base64::prelude::*;
use futures::prelude::*;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
@@ -119,6 +120,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
return Ok((version_uuid, data_md5sum_hex));
}
+ // The following consists in many steps that can each fail.
+ // Keep track that some cleanup will be needed if things fail
+ // before everything is finished (cleanup is done using the Drop trait).
+ let mut interrupted_cleanup = InterruptedCleanup(Some((
+ garage.clone(),
+ bucket.id,
+ key.into(),
+ version_uuid,
+ version_timestamp,
+ )));
+
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
@@ -139,44 +151,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let first_block_hash = async_blake2sum(first_block.clone()).await;
- let tx_result = (|| async {
- let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
- &garage,
- &version,
- 1,
- first_block,
- first_block_hash,
- &mut chunker,
- )
- .await?;
-
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
-
- check_quotas(&garage, bucket, key, total_size).await?;
+ let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
- Ok((total_size, data_md5sum))
- })()
- .await;
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )?;
- // If something went wrong, clean up
- let (total_size, md5sum_arr) = match tx_result {
- Ok(rv) => rv,
- Err(e) => {
- // Mark object as aborted, this will free the blocks further down
- object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
- garage.object_table.insert(&object).await?;
- return Err(e);
- }
- };
+ check_quotas(&garage, bucket, key, total_size).await?;
// Save final object state, marked as Complete
- let md5sum_hex = hex::encode(md5sum_arr);
+ let md5sum_hex = hex::encode(data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
@@ -188,6 +183,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // We were not interrupted, everything went fine.
+ // We won't have to clean up on drop.
+ interrupted_cleanup.cancel();
+
Ok((version_uuid, md5sum_hex))
}
@@ -209,7 +208,7 @@ fn ensure_checksum_matches(
}
}
if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
+ if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) {
return Err(Error::bad_request("Unable to validate content-md5"));
} else {
trace!("Successfully validated content-md5");
@@ -426,6 +425,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
+struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
+
+impl InterruptedCleanup {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+impl Drop for InterruptedCleanup {
+ fn drop(&mut self) {
+ if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
+ tokio::spawn(async move {
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_ts,
+ state: ObjectVersionState::Aborted,
+ };
+ let object = Object::new(bucket_id, key, vec![object_version]);
+ if let Err(e) = garage.object_table.insert(&object).await {
+ warn!("Cannot cleanup after aborted PutObject: {}", e);
+ }
+ });
+ }
+ }
+}
+
+// ----
+
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,
diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs
index f5a067bd..f0d7c816 100644
--- a/src/api/signature/error.rs
+++ b/src/api/signature/error.rs
@@ -11,7 +11,7 @@ pub enum Error {
Common(CommonError),
/// Authorization Header Malformed
- #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ #[error(display = "Authorization header malformed, unexpected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
// Category: bad request
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 1e4eb64e..c6985754 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_block"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_table = { version = "0.8.1", path = "../table" }
+garage_db = { version = "0.8.2", path = "../db" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_table = { version = "0.8.2", path = "../table" }
opentelemetry = "0.17"
@@ -25,11 +25,11 @@ arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
-zstd = { version = "0.9", default-features = false }
+zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 1b5a5df0..26278974 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -6,6 +6,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
+use rand::prelude::*;
use serde::{Deserialize, Serialize};
use futures::Stream;
@@ -23,10 +24,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_util::persister::PersisterShared;
+use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
@@ -89,6 +92,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
+ pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
@@ -125,8 +129,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let metrics =
- BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
+ let metrics = BlockManagerMetrics::new(
+ compression_level,
+ rc.rc.clone(),
+ resync.queue.clone(),
+ resync.errors.clone(),
+ );
+
+ let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
@@ -138,9 +148,11 @@ impl BlockManager {
system,
endpoint,
metrics,
+ scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager.scrub_persister.set_with(|_| ()).unwrap();
block_manager
}
@@ -155,7 +167,31 @@ impl BlockManager {
// Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
- bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
+ bg.spawn_worker(ScrubWorker::new(
+ self.clone(),
+ scrub_rx,
+ self.scrub_persister.clone(),
+ ));
+ }
+
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ self.resync.register_bg_vars(vars);
+
+ vars.register_rw(
+ &self.scrub_persister,
+ "scrub-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ |p, tranquility| p.set_with(|x| x.tranquility = tranquility),
+ );
+ vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| {
+ p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub))
+ });
+ vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
+ p.get_with(|x| x.corruptions_detected)
+ });
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -649,14 +685,21 @@ impl BlockManagerLocked {
}
};
- let mut path2 = path.clone();
- path2.set_extension("tmp");
- let mut f = fs::File::create(&path2).await?;
+ let mut path_tmp = path.clone();
+ let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
+ path_tmp.set_extension(tmp_extension);
+
+ let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
+
+ let mut f = fs::File::create(&path_tmp).await?;
f.write_all(data).await?;
f.sync_all().await?;
drop(f);
- fs::rename(path2, path).await?;
+ fs::rename(path_tmp, path).await?;
+
+ delete_on_drop.cancel();
+
if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?;
}
@@ -722,3 +765,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
.concat()
.into())
}
+
+struct DeleteOnDrop(Option<PathBuf>);
+
+impl DeleteOnDrop {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+
+impl Drop for DeleteOnDrop {
+ fn drop(&mut self) {
+ if let Some(path) = self.0.take() {
+ tokio::spawn(async move {
+ if let Err(e) = fs::remove_file(&path).await {
+ debug!("DeleteOnDrop failed for {}: {}", path.display(), e);
+ }
+ });
+ }
+ }
+}
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index fbef95af..6659df32 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -5,6 +5,7 @@ use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
+ pub(crate) _compression_level: ValueObserver<u64>,
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
@@ -25,9 +26,23 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
+ pub fn new(
+ compression_level: Option<i32>,
+ rc_tree: db::Tree,
+ resync_queue: CountedTree,
+ resync_errors: CountedTree,
+ ) -> Self {
let meter = global::meter("garage_model/block");
Self {
+ _compression_level: meter
+ .u64_value_observer("block.compression_level", move |observer| {
+ match compression_level {
+ Some(v) => observer.observe(v as u64, &[]),
+ None => observer.observe(0_u64, &[]),
+ }
+ })
+ .with_description("Garage compression level for node")
+ .init(),
_rc_size: meter
.u64_value_observer("block.rc_size", move |observer| {
if let Ok(Some(v)) = rc_tree.fast_len() {
diff --git a/src/block/rc.rs b/src/block/rc.rs
index 8dae3960..94cb5eea 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -24,9 +24,9 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match old_rc.increment().serialize() {
- Some(x) => tx.insert(&self.rc, &hash, x)?,
+ Some(x) => tx.insert(&self.rc, hash, x)?,
None => unreachable!(),
};
Ok(old_rc.is_zero())
@@ -39,10 +39,10 @@ impl BlockRc {
tx: &mut db::Transaction,
hash: &Hash,
) -> db::TxOpResult<bool> {
- let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
match new_rc.serialize() {
- Some(x) => tx.insert(&self.rc, &hash, x)?,
- None => tx.remove(&self.rc, &hash)?,
+ Some(x) => tx.insert(&self.rc, hash, x)?,
+ None => tx.remove(&self.rc, hash)?,
};
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
@@ -57,10 +57,10 @@ impl BlockRc {
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
self.rc.db().transaction(|mut tx| {
- let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
match rcval {
RcEntry::Deletable { at_time } if now > at_time => {
- tx.remove(&self.rc, &hash)?;
+ tx.remove(&self.rc, hash)?;
}
_ => (),
};
diff --git a/src/block/repair.rs b/src/block/repair.rs
index a6ded65a..c89484d9 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
+use rand::Rng;
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
@@ -13,14 +13,14 @@ use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
-// Full scrub every 30 days
-const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30);
+// Full scrub every 25 days with a random element of 10 days mixed in below
+const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25);
// Scrub tranquility is initially set to 4, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_SCRUB_TRANQUILITY: u32 = 4;
@@ -161,6 +161,51 @@ impl Worker for RepairWorker {
// and whose parameter (esp. speed) can be controlled at runtime.
// ---- ---- ----
+mod v081 {
+ use serde::{Deserialize, Serialize};
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
+}
+
+mod v082 {
+ use serde::{Deserialize, Serialize};
+
+ use super::v081;
+
+ #[derive(Serialize, Deserialize)]
+ pub struct ScrubWorkerPersisted {
+ pub tranquility: u32,
+ pub(crate) time_last_complete_scrub: u64,
+ pub(crate) time_next_run_scrub: u64,
+ pub(crate) corruptions_detected: u64,
+ }
+
+ impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
+ type Previous = v081::ScrubWorkerPersisted;
+ const VERSION_MARKER: &'static [u8] = b"G082bswp";
+
+ fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
+ use crate::repair::randomize_next_scrub_run_time;
+
+ ScrubWorkerPersisted {
+ tranquility: old.tranquility,
+ time_last_complete_scrub: old.time_last_complete_scrub,
+ time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
+ corruptions_detected: old.corruptions_detected,
+ }
+ }
+ }
+}
+
+pub use v082::*;
+
pub struct ScrubWorker {
manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
@@ -168,17 +213,33 @@ pub struct ScrubWorker {
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
- persister: Persister<ScrubWorkerPersisted>,
- persisted: ScrubWorkerPersisted,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+}
+
+fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
+ // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
+ // balance scrub load across different cluster nodes.
+
+ let next_run_timestamp = timestamp
+ + SCRUB_INTERVAL
+ .saturating_add(Duration::from_secs(
+ rand::thread_rng().gen_range(0..3600 * 24 * 10),
+ ))
+ .as_millis() as u64;
+
+ next_run_timestamp
}
-#[derive(Serialize, Deserialize)]
-struct ScrubWorkerPersisted {
- tranquility: u32,
- time_last_complete_scrub: u64,
- corruptions_detected: u64,
+impl Default for ScrubWorkerPersisted {
+ fn default() -> Self {
+ ScrubWorkerPersisted {
+ time_last_complete_scrub: 0,
+ time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
+ tranquility: INITIAL_SCRUB_TRANQUILITY,
+ corruptions_detected: 0,
+ }
+ }
}
-impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState {
Running(BlockStoreIterator),
@@ -198,27 +259,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration),
Resume,
Cancel,
- SetTranquility(u32),
}
impl ScrubWorker {
- pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
- let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ScrubWorkerPersisted {
- time_last_complete_scrub: 0,
- tranquility: INITIAL_SCRUB_TRANQUILITY,
- corruptions_detected: 0,
- },
- };
+ pub(crate) fn new(
+ manager: Arc<BlockManager>,
+ rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
+ persister: PersisterShared<ScrubWorkerPersisted>,
+ ) -> Self {
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
- persisted,
}
}
@@ -227,6 +281,7 @@ impl ScrubWorker {
ScrubWorkerCommand::Start => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Finished => {
+ info!("Scrub worker initializing, now performing datastore scrub");
let iterator = BlockStoreIterator::new(&self.manager);
ScrubWorkerState::Running(iterator)
}
@@ -267,12 +322,6 @@ impl ScrubWorker {
}
}
}
- ScrubWorkerCommand::SetTranquility(t) => {
- self.persisted.tranquility = t;
- if let Err(e) = self.persister.save_async(&self.persisted).await {
- error!("Could not save new tranquilitiy value: {}", e);
- }
- }
}
}
}
@@ -284,9 +333,19 @@ impl Worker for ScrubWorker {
}
fn status(&self) -> WorkerStatus {
+ let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) =
+ self.persister.get_with(|p| {
+ (
+ p.corruptions_detected,
+ p.tranquility,
+ p.time_last_complete_scrub,
+ p.time_next_run_scrub,
+ )
+ });
+
let mut s = WorkerStatus {
- persistent_errors: Some(self.persisted.corruptions_detected),
- tranquility: Some(self.persisted.tranquility),
+ persistent_errors: Some(corruptions_detected),
+ tranquility: Some(tranquility),
..Default::default()
};
match &self.work {
@@ -298,10 +357,16 @@ impl Worker for ScrubWorker {
s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
}
ScrubWorkerState::Finished => {
- s.freeform = vec![format!(
- "Last scrub completed at {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
- )];
+ s.freeform = vec![
+ format!(
+ "Last scrub completed at {}",
+ msec_to_rfc3339(time_last_complete_scrub),
+ ),
+ format!(
+ "Next scrub scheduled for {}",
+ msec_to_rfc3339(time_next_run_scrub)
+ ),
+ ];
}
}
s
@@ -321,20 +386,30 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
- self.persisted.corruptions_detected += 1;
- self.persister.save_async(&self.persisted).await?;
+ self.persister.set_with(|p| p.corruptions_detected += 1)?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
- .tranquilize_worker(self.persisted.tranquility))
+ .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
- self.persisted.time_last_complete_scrub = now_msec();
- self.persister.save_async(&self.persisted).await?;
+ let now = now_msec();
+ let next_scrub_timestamp = randomize_next_scrub_run_time(now);
+
+ self.persister.set_with(|p| {
+ p.time_last_complete_scrub = now;
+ p.time_next_run_scrub = next_scrub_timestamp;
+ })?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
+
+ info!(
+ "Datastore scrub completed, next scrub scheduled for {}",
+ msec_to_rfc3339(next_scrub_timestamp)
+ );
+
Ok(WorkerState::Idle)
}
}
@@ -347,7 +422,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
- self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
+ self.persister.get_with(|p| p.time_next_run_scrub),
ScrubWorkerCommand::Start,
),
};
@@ -462,11 +537,11 @@ impl BlockStoreIterator {
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() {
let path = data_dir_ent.path();
self.path.push(ReadingDir::Pending(path));
} else if name.len() == 64 {
- if let Ok(h) = hex::decode(&name) {
+ if let Ok(h) = hex::decode(name) {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
return Ok(Some(hash.into()));
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c7b3b0e..ea280ad4 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::persister::Persister;
+use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
pub(crate) queue: CountedTree,
- pub(crate) notify: Notify,
+ pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree,
busy_set: BusySet,
- persister: Persister<ResyncPersistedConfig>,
- persisted: ArcSwap<ResyncPersistedConfig>,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
@@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32,
}
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
+impl Default for ResyncPersistedConfig {
+ fn default() -> Self {
+ ResyncPersistedConfig {
+ n_workers: 1,
+ tranquility: INITIAL_RESYNC_TRANQUILITY,
+ }
+ }
+}
enum ResyncIterResult {
BusyDidSomething,
@@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
- let persister = Persister::new(&system.metadata_dir, "resync_cfg");
- let persisted = match persister.load() {
- Ok(v) => v,
- Err(_) => ResyncPersistedConfig {
- n_workers: 1,
- tranquility: INITIAL_RESYNC_TRANQUILITY,
- },
- };
+ let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
Self {
queue,
- notify: Notify::new(),
+ notify: Arc::new(Notify::new()),
errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister,
- persisted: ArcSwap::new(Arc::new(persisted)),
}
}
@@ -142,6 +140,38 @@ impl BlockResyncManager {
)))
}
+ pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-worker-count",
+ |p| p.get_with(|x| x.n_workers),
+ move |p, n_workers| {
+ if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
+ return Err(Error::Message(format!(
+ "Invalid number of resync workers, must be between 1 and {}",
+ MAX_RESYNC_WORKERS
+ )));
+ }
+ p.set_with(|x| x.n_workers = n_workers)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+
+ let notify = self.notify.clone();
+ vars.register_rw(
+ &self.persister,
+ "resync-tranquility",
+ |p| p.get_with(|x| x.tranquility),
+ move |p, tranquility| {
+ p.set_with(|x| x.tranquility = tranquility)?;
+ notify.notify_waiters();
+ Ok(())
+ },
+ );
+ }
+
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(())
}
-
- async fn update_persisted(
- &self,
- update: impl Fn(&mut ResyncPersistedConfig),
- ) -> Result<(), Error> {
- let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
- update(&mut cfg);
- self.persister.save_async(&cfg).await?;
- self.persisted.store(Arc::new(cfg));
- self.notify.notify_waiters();
- Ok(())
- }
-
- pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
- if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
- return Err(Error::Message(format!(
- "Invalid number of resync workers, must be between 1 and {}",
- MAX_RESYNC_WORKERS
- )));
- }
- self.update_persisted(|cfg| cfg.n_workers = n_workers).await
- }
-
- pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
- self.update_persisted(|cfg| cfg.tranquility = tranquility)
- .await
- }
}
impl Drop for BusyBlock {
@@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
+ persister: PersisterShared<ResyncPersistedConfig>,
}
impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
+ let persister = manager.resync.persister.clone();
Self {
index,
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
+ persister,
}
}
}
@@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
}
fn status(&self) -> WorkerStatus {
- let persisted = self.manager.resync.persisted.load();
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
- if self.index >= persisted.n_workers {
+ if self.index >= n_workers {
return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
@@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
- tranquility: Some(persisted.tranquility),
+ tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- if self.index >= self.manager.resync.persisted.load().n_workers {
+ let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
+
+ if self.index >= n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await {
- Ok(ResyncIterResult::BusyDidSomething) => Ok(self
- .tranquilizer
- .tranquilize_worker(self.manager.resync.persisted.load().tranquility)),
+ Ok(ResyncIterResult::BusyDidSomething) => {
+ Ok(self.tranquilizer.tranquilize_worker(tranquility))
+ }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
}
async fn wait_for_work(&mut self) -> WorkerState {
- while self.index >= self.manager.resync.persisted.load().n_workers {
+ while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await
}
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index c479d9d1..e3a65857 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_db"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -19,18 +19,18 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
-tracing = "0.1.30"
+tracing = "0.1"
heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true }
-rusqlite = { version = "0.27", optional = true }
+rusqlite = { version = "0.28", optional = true }
sled = { version = "0.34", optional = true }
# cli deps
-clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
+clap = { version = "4.1", optional = true, features = ["derive", "env"] }
pretty_env_logger = { version = "0.4", optional = true }
[dev-dependencies]
-mktemp = "0.4"
+mktemp = "0.5"
[features]
default = [ "sled" ]
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index b43b0242..0cbdf890 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -21,22 +21,22 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
-garage_api = { version = "0.8.1", path = "../api" }
-garage_block = { version = "0.8.1", path = "../block" }
-garage_model = { version = "0.8.1", path = "../model" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_table = { version = "0.8.1", path = "../table" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_web = { version = "0.8.1", path = "../web" }
+garage_db = { version = "0.8.2", path = "../db" }
+garage_api = { version = "0.8.2", path = "../api" }
+garage_block = { version = "0.8.2", path = "../block" }
+garage_model = { version = "0.8.2", path = "../model" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_table = { version = "0.8.2", path = "../table" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_web = { version = "0.8.2", path = "../web" }
backtrace = "0.3"
bytes = "1.0"
bytesize = "1.1"
-timeago = "0.3"
+timeago = "0.4"
parse_duration = "2.1"
hex = "0.4"
-tracing = { version = "0.1.30" }
+tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8"
async-trait = "0.1.7"
@@ -45,7 +45,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
-toml = "0.5"
+toml = "0.6"
futures = "0.3"
futures-util = "0.3"
@@ -69,7 +69,7 @@ sha2 = "0.10"
static_init = "1.0"
assert-json-diff = "2.0"
serde_json = "1.0"
-base64 = "0.13"
+base64 = "0.21"
[features]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 58d645ac..e4e50520 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,10 +15,10 @@ use garage_util::time::*;
use garage_table::replication::*;
use garage_table::*;
+use garage_rpc::ring::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
-use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
@@ -60,6 +60,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
+ WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@@ -783,6 +784,7 @@ impl AdminRpcHandler {
for node in ring.layout.node_ids().iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
+ opt.skip_global = true;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
@@ -799,6 +801,15 @@ impl AdminRpcHandler {
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ write!(
+ &mut ret,
+ "Cluster statistics:\n\n{}",
+ self.gather_cluster_stats()
+ )
+ .unwrap();
+
Ok(AdminRpc::Ok(ret))
} else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
@@ -809,32 +820,17 @@ impl AdminRpcHandler {
let mut ret = String::new();
writeln!(
&mut ret,
- "\nGarage version: {} [features: {}]",
+ "\nGarage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
- // Gather ring statistics
- let ring = self.garage.system.ring.borrow().clone();
- let mut ring_nodes = HashMap::new();
- for (_i, loc) in ring.partitions().iter() {
- for n in ring.get_nodes(loc, ring.replication_factor).iter() {
- if !ring_nodes.contains_key(n) {
- ring_nodes.insert(*n, 0usize);
- }
- *ring_nodes.get_mut(n).unwrap() += 1;
- }
- }
- writeln!(&mut ret, "\nRing nodes & partition count:").unwrap();
- for (n, c) in ring_nodes.iter() {
- writeln!(&mut ret, " {:?} {}", n, c).unwrap();
- }
-
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
@@ -881,12 +877,110 @@ impl AdminRpcHandler {
.unwrap();
if !opt.detailed {
- writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
+ writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
+ }
+
+ if !opt.skip_global {
+ write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
}
Ok(ret)
}
+ fn gather_cluster_stats(&self) -> String {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics
+ let layout = &self.garage.system.ring.borrow().layout;
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.ring_assignment_data.iter() {
+ let id = layout.node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = self
+ .garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ ret
+ }
+
fn gather_table_stats<F, R>(
&self,
t: &Arc<Table<F, R>>,
@@ -943,32 +1037,101 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerOperation::Set { opt } => match opt {
- WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
- self.garage
- .block_manager
- .send_scrub_command(scrub_command)
- .await?;
- Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
- }
- WorkerSetCmd::ResyncWorkerCount { worker_count } => {
- self.garage
- .block_manager
- .resync
- .set_n_workers(*worker_count)
- .await?;
- Ok(AdminRpc::Ok("Number of resync workers updated".into()))
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- WorkerSetCmd::ResyncTranquility { tranquility } => {
- self.garage
- .block_manager
- .resync
- .set_tranquility(*tranquility)
- .await?;
- Ok(AdminRpc::Ok("Resync tranquility updated".into()))
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- },
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 0d180ecd..905b14d3 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
- let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
+ let mut healthy_nodes =
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
match layout.roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => {
+ let data_avail = match &adv.status.data_disk_avail {
+ _ if cfg.capacity.is_none() => "N/A".into(),
+ Some((avail, total)) => {
+ let pct = (*avail as f64) / (*total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(*avail);
+ format!("{} ({:.1}%)", avail, pct)
+ }
+ None => "?".into(),
+ };
healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
+ data_avail = data_avail,
));
}
_ => {
@@ -191,6 +202,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
+ AdminRpc::WorkerVars(wv) => {
+ print_worker_vars(wv);
+ }
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs
index 511b53a6..20813f1c 100644
--- a/src/garage/cli/init.rs
+++ b/src/garage/cli/init.rs
@@ -14,11 +14,11 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?;
let idstr = if let Some(addr) = config.rpc_public_addr {
- let idstr = format!("{}@{}", hex::encode(&node_id), addr);
+ let idstr = format!("{}@{}", hex::encode(node_id), addr);
println!("{}", idstr);
idstr
} else {
- let idstr = hex::encode(&node_id);
+ let idstr = hex::encode(node_id);
println!("{}", idstr);
if !quiet {
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index dcb9fef9..986592ae 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -515,6 +515,11 @@ pub struct StatsOpt {
/// Gather detailed statistics (this can be long)
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
+
+ /// Don't show global cluster stats (internal use in RPC)
+ #[structopt(skip)]
+ #[serde(default)]
+ pub skip_global: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
@@ -528,11 +533,25 @@ pub enum WorkerOperation {
/// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())]
Info { tid: usize },
+ /// Get worker parameter
+ #[structopt(name = "get", version = garage_version())]
+ Get {
+ /// Gather variable values from all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable name to get, or none to get all variables
+ variable: Option<String>,
+ },
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
- #[structopt(subcommand)]
- opt: WorkerSetCmd,
+ /// Set variable values on all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable node to set
+ variable: String,
+ /// Value to set the variable to
+ value: String,
},
}
@@ -547,19 +566,6 @@ pub struct WorkerListOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerSetCmd {
- /// Set tranquility of scrub operations
- #[structopt(name = "scrub-tranquility", version = garage_version())]
- ScrubTranquility { tranquility: u32 },
- /// Set number of concurrent block resync workers
- #[structopt(name = "resync-worker-count", version = garage_version())]
- ResyncWorkerCount { worker_count: usize },
- /// Set tranquility of block resync operations
- #[structopt(name = "resync-tranquility", version = garage_version())]
- ResyncTranquility { tranquility: u32 },
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 63fd9eba..2c6be2f4 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -229,7 +229,7 @@ pub fn find_matching_node(
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
- if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
+ if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
@@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
+pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
+ let table = wv
+ .into_iter()
+ .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
+ .collect::<Vec<_>>();
+ format_table(table);
+}
+
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
diff --git a/src/garage/main.rs b/src/garage/main.rs
index cd1d6228..9b069d82 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -28,6 +28,7 @@ use structopt::StructOpt;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::NetworkKey;
+use garage_util::config::Config;
use garage_util::error::*;
use garage_rpc::system::*;
@@ -49,11 +50,10 @@ struct Opt {
#[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")]
pub rpc_host: Option<String>,
- /// RPC secret network key for admin operations
- #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
- pub rpc_secret: Option<String>,
+ #[structopt(flatten)]
+ pub secrets: Secrets,
- /// Configuration file (garage.toml)
+ /// Path to configuration file
#[structopt(
short = "c",
long = "config",
@@ -66,6 +66,24 @@ struct Opt {
cmd: Command,
}
+#[derive(StructOpt, Debug)]
+pub struct Secrets {
+ /// RPC secret network key, used to replace rpc_secret in config.toml when running the
+ /// daemon or doing admin operations
+ #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
+ pub rpc_secret: Option<String>,
+
+ /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")]
+ pub admin_token: Option<String>,
+
+ /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")]
+ pub metrics_token: Option<String>,
+}
+
#[tokio::main]
async fn main() {
// Initialize version and features info
@@ -148,9 +166,9 @@ async fn main() {
sodiumoxide::init().expect("Unable to init sodiumoxide");
let res = match opt.cmd {
- Command::Server => server::run_server(opt.config_file).await,
+ Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
- repair::offline::offline_repair(opt.config_file, repair_opt).await
+ repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
@@ -165,7 +183,7 @@ async fn main() {
}
async fn cli_command(opt: Opt) -> Result<(), Error> {
- let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() {
+ let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() {
Some(garage_util::config::read_config(opt.config_file.clone())
.err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?)
} else {
@@ -174,9 +192,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse network RPC secret
let net_key_hex_str = opt
+ .secrets
.rpc_secret
.as_ref()
- .or_else(|| config.as_ref().map(|c| &c.rpc_secret))
+ .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref()))
.ok_or("No RPC secret provided")?;
let network_key = NetworkKey::from_slice(
&hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],
@@ -233,3 +252,16 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
Ok(x) => Ok(x),
}
}
+
+fn fill_secrets(mut config: Config, secrets: Secrets) -> Config {
+ if secrets.rpc_secret.is_some() {
+ config.rpc_secret = secrets.rpc_secret;
+ }
+ if secrets.admin_token.is_some() {
+ config.admin.admin_token = secrets.admin_token;
+ }
+ if secrets.metrics_token.is_some() {
+ config.admin.metrics_token = secrets.metrics_token;
+ }
+ config
+}
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index 25193e4a..f4edcf03 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -6,8 +6,13 @@ use garage_util::error::*;
use garage_model::garage::Garage;
use crate::cli::structs::*;
+use crate::{fill_secrets, Secrets};
-pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+pub async fn offline_repair(
+ config_file: PathBuf,
+ secrets: Secrets,
+ opt: OfflineRepairOpt,
+) -> Result<(), Error> {
if !opt.yes {
return Err(Error::Message(
"Please add the --yes flag to launch repair operation".into(),
@@ -15,7 +20,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
}
info!("Loading configuration...");
- let config = read_config(config_file)?;
+ let config = fill_secrets(read_config(config_file)?, secrets);
info!("Initializing Garage main data store...");
let garage = Garage::new(config)?;
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 627e3bf3..0e14ed51 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
- ScrubWorkerCommand::SetTranquility(tranquility)
+ garage
+ .block_manager
+ .scrub_persister
+ .set_with(|x| x.tranquility = tranquility)?;
+ return Ok(());
}
};
info!("Sending command to scrub worker: {:?}", cmd);
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 16f1b625..958089c6 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -17,6 +17,7 @@ use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
+use crate::{fill_secrets, Secrets};
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -26,9 +27,9 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
}
}
-pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
+pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file)?;
+ let config = fill_secrets(read_config(config_file)?, secrets);
// ---- Initialize Garage internals ----
diff --git a/src/garage/tests/admin.rs b/src/garage/tests/admin.rs
index 37aefe38..d3becf0a 100644
--- a/src/garage/tests/admin.rs
+++ b/src/garage/tests/admin.rs
@@ -21,14 +21,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "allow",
- "--read",
- "--key",
- &ctx.garage.key.id,
- BCKT_NAME,
- ])
+ .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
@@ -36,14 +29,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "deny",
- "--read",
- "--key",
- &ctx.garage.key.name,
- BCKT_NAME,
- ])
+ .args(["bucket", "deny", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
@@ -51,14 +37,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "allow",
- "--read",
- "--key",
- &ctx.garage.key.name,
- BCKT_NAME,
- ])
+ .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index 9c363013..0dec3cfa 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -13,7 +13,7 @@ async fn test_bucket_all() {
ctx.garage
.command()
.args(["key", "deny"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not deny key to create buckets");
@@ -26,7 +26,7 @@ async fn test_bucket_all() {
ctx.garage
.command()
.args(["key", "allow"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not deny key to create buckets");
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index 212588b5..e9d4849a 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -1,15 +1,9 @@
use aws_sdk_s3::{Client, Config, Credentials, Endpoint};
-use super::garage::Instance;
+use super::garage::{Instance, Key};
-pub fn build_client(instance: &Instance) -> Client {
- let credentials = Credentials::new(
- &instance.key.id,
- &instance.key.secret,
- None,
- None,
- "garage-integ-test",
- );
+pub fn build_client(instance: &Instance, key: &Key) -> Client {
+ let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test");
let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 1700cc90..609eda97 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -14,6 +14,7 @@ use garage_api::signature;
/// You should ever only use this to send requests AWS sdk won't send,
/// like to reproduce behavior of unusual implementations found to be
/// problematic.
+#[derive(Clone)]
pub struct CustomRequester {
key: Key,
uri: Uri,
@@ -22,18 +23,18 @@ pub struct CustomRequester {
}
impl CustomRequester {
- pub fn new_s3(instance: &Instance) -> Self {
+ pub fn new_s3(instance: &Instance, key: &Key) -> Self {
CustomRequester {
- key: instance.key.clone(),
+ key: key.clone(),
uri: instance.s3_uri(),
service: "s3",
client: Client::new(),
}
}
- pub fn new_k2v(instance: &Instance) -> Self {
+ pub fn new_k2v(instance: &Instance, key: &Key) -> Self {
CustomRequester {
- key: instance.key.clone(),
+ key: key.clone(),
uri: instance.k2v_uri(),
service: "k2v",
client: Client::new(),
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index dbebe5b1..3beed7c4 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -13,7 +13,7 @@ static GARAGE_TEST_SECRET: &str =
#[derive(Debug, Default, Clone)]
pub struct Key {
- pub name: String,
+ pub name: Option<String>,
pub id: String,
pub secret: String,
}
@@ -21,10 +21,11 @@ pub struct Key {
pub struct Instance {
process: process::Child,
pub path: PathBuf,
- pub key: Key,
+ pub default_key: Key,
pub s3_port: u16,
pub k2v_port: u16,
pub web_port: u16,
+ pub admin_port: u16,
}
impl Instance {
@@ -101,22 +102,36 @@ api_bind_addr = "127.0.0.1:{admin_port}"
Instance {
process: child,
path,
- key: Key::default(),
+ default_key: Key::default(),
s3_port: port,
k2v_port: port + 1,
web_port: port + 3,
+ admin_port: port + 4,
}
}
fn setup(&mut self) {
- use std::{thread, time::Duration};
-
- // Wait for node to be ready
- thread::sleep(Duration::from_secs(2));
-
+ self.wait_for_boot();
self.setup_layout();
+ self.default_key = self.key(Some("garage_test"));
+ }
+
+ fn wait_for_boot(&mut self) {
+ use std::{thread, time::Duration};
- self.key = self.new_key("garage_test");
+ // 60 * 2 seconds = 120 seconds = 2min
+ for _ in 0..60 {
+ let termination = self
+ .command()
+ .args(["status"])
+ .quiet()
+ .status()
+ .expect("Unable to run command");
+ if termination.success() {
+ break;
+ }
+ thread::sleep(Duration::from_secs(2));
+ }
}
fn setup_layout(&self) {
@@ -167,13 +182,17 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.expect("Could not build garage endpoint URI")
}
- pub fn new_key(&self, name: &str) -> Key {
+ pub fn key(&self, maybe_name: Option<&str>) -> Key {
let mut key = Key::default();
- let output = self
- .command()
- .args(["key", "create", name])
- .expect_success_output("Could not create key");
+ let mut cmd = self.command();
+ let base = cmd.args(["key", "create"]);
+ let with_name = match maybe_name {
+ Some(name) => base.args([name]),
+ None => base,
+ };
+
+ let output = with_name.expect_success_output("Could not create key");
let stdout = String::from_utf8(output.stdout).unwrap();
for line in stdout.lines() {
@@ -190,7 +209,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
Key {
- name: name.to_owned(),
+ name: maybe_name.map(String::from),
..key
}
}
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index 28874b02..eca3e42b 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -13,13 +13,16 @@ use custom_requester::CustomRequester;
const REGION: Region = Region::from_static("garage-integ-test");
+#[derive(Clone)]
pub struct Context {
pub garage: &'static garage::Instance,
+ pub key: garage::Key,
pub client: Client,
pub custom_request: CustomRequester,
pub k2v: K2VContext,
}
+#[derive(Clone)]
pub struct K2VContext {
pub request: CustomRequester,
}
@@ -27,13 +30,15 @@ pub struct K2VContext {
impl Context {
fn new() -> Self {
let garage = garage::instance();
- let client = client::build_client(garage);
- let custom_request = CustomRequester::new_s3(garage);
- let k2v_request = CustomRequester::new_k2v(garage);
+ let key = garage.key(None);
+ let client = client::build_client(garage, &key);
+ let custom_request = CustomRequester::new_s3(garage, &key);
+ let k2v_request = CustomRequester::new_k2v(garage, &key);
Context {
garage,
client,
+ key,
custom_request,
k2v: K2VContext {
request: k2v_request,
@@ -57,7 +62,7 @@ impl Context {
.args(["bucket", "allow"])
.args(["--owner", "--read", "--write"])
.arg(&bucket_name)
- .args(["--key", &self.garage.key.name])
+ .args(["--key", &self.key.id])
.quiet()
.expect_success_status("Could not allow key for bucket");
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
index 6abba1c5..595d0ba8 100644
--- a/src/garage/tests/k2v/batch.rs
+++ b/src/garage/tests/k2v/batch.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -36,12 +37,12 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
{{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
]"#,
- base64::encode(values.get(&"a").unwrap()),
- base64::encode(values.get(&"b").unwrap()),
- base64::encode(values.get(&"c").unwrap()),
- base64::encode(values.get(&"d.1").unwrap()),
- base64::encode(values.get(&"d.2").unwrap()),
- base64::encode(values.get(&"e").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"a").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"b").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"e").unwrap()),
)
.into_bytes(),
)
@@ -120,12 +121,12 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -141,10 +142,10 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -160,9 +161,9 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -178,8 +179,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -195,8 +196,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -212,7 +213,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}
],
"more": true,
"nextStart": "b",
@@ -228,8 +229,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -255,10 +256,10 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
]"#,
ct.get(&"b").unwrap(),
- base64::encode(values.get(&"c'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c'").unwrap()),
ct.get(&"d.1").unwrap(),
- base64::encode(values.get(&"d.1'").unwrap()),
- base64::encode(values.get(&"d.2'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()),
)
.into_bytes(),
)
@@ -333,11 +334,11 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -353,8 +354,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -370,7 +371,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -386,7 +387,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": true,
"nextStart": "d.2",
@@ -402,7 +403,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -418,8 +419,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -435,8 +436,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -452,8 +453,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -563,8 +564,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -580,8 +581,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -599,10 +600,10 @@ async fn test_batch() {
"items": [
{"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
],
"more": false,
"nextStart": null,
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 2641386f..588836c7 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -222,7 +223,10 @@ async fn test_items_and_indices() {
let res_json = json_body(res).await;
assert_json_eq!(
res_json,
- [base64::encode(&content2), base64::encode(&content3)]
+ [
+ BASE64_STANDARD.encode(&content2),
+ BASE64_STANDARD.encode(&content3)
+ ]
);
// ReadIndex -- now there should be some stuff
@@ -411,7 +415,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// f2: binary
let res = ctx
@@ -452,7 +456,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// -- Test with a second, concurrent value --
let res = ctx
@@ -488,8 +492,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -512,8 +516,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -550,8 +554,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -587,7 +591,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f1: not specified
let res = ctx
@@ -612,7 +619,10 @@ async fn test_item_return_format() {
.unwrap()
.to_string();
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f2: binary
let res = ctx
@@ -644,7 +654,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// -- Delete everything --
let res = ctx
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index e56705ae..dd44aed9 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,12 +1,17 @@
+use base64::prelude::*;
use hyper::{Method, StatusCode};
use std::time::Duration;
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
use crate::common;
#[tokio::test]
-async fn test_poll() {
+async fn test_poll_item() {
let ctx = common::context();
- let bucket = ctx.create_bucket("test-k2v-poll");
+ let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@@ -52,8 +57,8 @@ async fn test_poll() {
let poll = {
let bucket = bucket.clone();
let ct = ct.clone();
+ let ctx = ctx.clone();
tokio::spawn(async move {
- let ctx = common::context();
ctx.k2v
.request
.builder(bucket.clone())
@@ -96,3 +101,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
+
+#[tokio::test]
+async fn test_poll_range() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll-range");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Initial poll range, retrieve single item and first seen_marker
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(b"{}".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let json_res = json_body(res2).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_json_eq!(
+ json_res,
+ json!(
+ {
+ "items": [
+ {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]},
+ ],
+ "seenMarker": seen_marker,
+ }
+ )
+ );
+
+ // Second poll range, which will complete later
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"New value")])
+ );
+
+ // Now we will add a value on a different key
+ // Start a new poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write value on different key
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test2"))
+ .body(b"Other value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"Other value")])
+ );
+}
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 48da7607..b7a1acae 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -109,7 +109,7 @@ async fn test_create_bucket_streaming() {
ctx.garage
.command()
.args(["key", "allow"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not allow key to create buckets");
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index 244a2fa0..f61838e4 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -1,5 +1,8 @@
use crate::common;
use crate::common::ext::*;
+use crate::k2v::json_body;
+
+use assert_json_diff::assert_json_eq;
use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
types::ByteStream,
@@ -9,6 +12,7 @@ use hyper::{
body::{to_bytes, Body},
Client,
};
+use serde_json::json;
const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
const BODY_ERR: &[u8; 6] = b"erreur";
@@ -49,6 +53,31 @@ async fn test_website() {
BODY.as_ref()
); /* check that we do not leak body */
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
ctx.garage
.command()
.args(["bucket", "website", "--allow", BCKT_NAME])
@@ -62,6 +91,25 @@ async fn test_website() {
BODY.as_ref()
);
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let mut admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::OK);
+ assert_eq!(
+ to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
+ format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes()
+ );
+
ctx.garage
.command()
.args(["bucket", "website", "--deny", BCKT_NAME])
@@ -74,6 +122,31 @@ async fn test_website() {
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
); /* check that we do not leak body */
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
}
#[tokio::test]
@@ -322,3 +395,103 @@ async fn test_website_s3_api() {
);
}
}
+
+#[tokio::test]
+async fn test_website_check_website_enabled() {
+ let ctx = common::context();
+
+ let client = Client::new();
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: No domain query string found",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: ",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=foobar",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: foobar",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=%E2%98%B9",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: ☹",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+}
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index f57ce849..52c16d89 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.0.1"
+version = "0.1.1"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -9,20 +9,21 @@ repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[dependencies]
-base64 = "0.13.0"
-http = "0.2.6"
+base64 = "0.21"
+http = "0.2"
log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
-serde = "1.0.137"
-serde_json = "1.0.81"
-thiserror = "1.0.31"
-tokio = "1.17.0"
+hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
+serde = "1.0"
+serde_json = "1.0"
+thiserror = "1.0"
+tokio = "1.24"
# cli deps
-clap = { version = "3.1.18", optional = true, features = ["derive", "env"] }
-garage_util = { version = "0.8.1", path = "../util", optional = true }
+clap = { version = "4.1", optional = true, features = ["derive", "env"] }
+garage_util = { version = "0.8.2", path = "../util", optional = true }
[features]
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index 925ebeb8..cdd63cce 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -1,3 +1,5 @@
+use std::collections::BTreeMap;
+use std::process::exit;
use std::time::Duration;
use k2v_client::*;
@@ -57,22 +59,39 @@ enum Command {
#[clap(flatten)]
output_kind: ReadOutputKind,
},
- /// Watch changes on a single value
- Poll {
- /// Partition key to delete from
+ /// Watch changes on a single value
+ PollItem {
+ /// Partition key of item to watch
partition_key: String,
- /// Sort key to delete from
+ /// Sort key of item to watch
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
/// Timeout, in seconds
- #[clap(short, long)]
+ #[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
+ /// Watch changes on a range of values
+ PollRange {
+ /// Partition key to poll from
+ partition_key: String,
+ /// Output only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ /// Marker of data that had previously been seen by a PollRange
+ #[clap(short = 'S', long)]
+ seen_marker: Option<String>,
+ /// Timeout, in seconds
+ #[clap(short = 'T', long)]
+ timeout: Option<u64>,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ },
/// Delete a single value
Delete {
/// Partition key to delete from
@@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
- use std::process::exit;
if self.json {
let stdout = std::io::stdout();
@@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool,
}
+impl BatchOutputKind {
+ fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
+ for (key, values) in values {
+ println!("key: {}", key);
+ let causality: String = values.causality.into();
+ println!("causality: {}", causality);
+ for value in values.value {
+ match value {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" value(utf-8): {}", string);
+ } else {
+ println!(" value(base64): {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ }
+ exit(0);
+ }
+
+ fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
+ values
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>()
+ }
+
+ fn display_poll_range_output(
+ &self,
+ seen_marker: String,
+ values: BTreeMap<String, CausalValue>,
+ ) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "values": self.values_json(values),
+ "seen_marker": seen_marker,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ println!("seen marker: {}", seen_marker);
+ self.display_human_output(values)
+ }
+ }
+
+ fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": self.values_json(res.items),
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+ self.display_human_output(res.items)
+ }
+ }
+}
+
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
- Command::Poll {
+ Command::PollItem {
partition_key,
sort_key,
causality,
@@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
if let Some(res) = res_opt {
output_kind.display_output(res);
} else {
- println!("Delay expired and value didn't change.");
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
+ }
+ Command::PollRange {
+ partition_key,
+ filter,
+ seen_marker,
+ timeout,
+ output_kind,
+ } => {
+ if filter.conflicts_only
+ || filter.tombstones
+ || filter.reverse
+ || filter.limit.is_some()
+ {
+ return Err(Error::Message(
+ "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
+ ));
+ }
+
+ let timeout = timeout.map(Duration::from_secs);
+ let res = client
+ .poll_range(
+ &partition_key,
+ Some(PollRangeFilter {
+ start: filter.start.as_deref(),
+ end: filter.end.as_deref(),
+ prefix: filter.prefix.as_deref(),
+ }),
+ seen_marker.as_deref(),
+ timeout,
+ )
+ .await?;
+ match res {
+ Some((items, seen_marker)) => {
+ output_kind.display_poll_range_output(seen_marker, items);
+ }
+ None => {
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
}
}
Command::ReadIndex {
@@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
- if output_kind.json {
- let values = res
- .items
- .into_iter()
- .map(|(k, v)| {
- let mut value = serde_json::to_value(v).unwrap();
- value
- .as_object_mut()
- .unwrap()
- .insert("sort_key".to_owned(), k.into());
- value
- })
- .collect::<Vec<_>>();
- let json = serde_json::json!({
- "next_key": res.next_start,
- "values": values,
- });
-
- let stdout = std::io::stdout();
- serde_json::to_writer_pretty(stdout, &json).unwrap();
- } else {
- if let Some(next) = res.next_start {
- println!("next key: {}", next);
- }
- for (key, values) in res.items {
- println!("key: {}", key);
- let causality: String = values.causality.into();
- println!("causality: {}", causality);
- for value in values.value {
- match value {
- K2vValue::Value(v) => {
- if let Ok(string) = std::str::from_utf8(&v) {
- println!(" value(utf-8): {}", string);
- } else {
- println!(" value(base64): {}", base64::encode(&v));
- }
- }
- K2vValue::Tombstone => {
- println!(" tombstone");
- }
- }
- }
- }
- }
+ output_kind.display_read_range_output(res);
}
Command::DeleteRange {
partition_key,
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index c2606af4..ca52d0cf 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
- let mut client = HttpClient::new()?;
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent {
client.local_agent_prepend(ua);
} else {
@@ -153,6 +159,58 @@ impl K2vClient {
}
}
+ /// Perform a PollRange request, waiting for any change in a given range of keys
+ /// to occur
+ pub async fn poll_range(
+ &self,
+ partition_key: &str,
+ filter: Option<PollRangeFilter<'_>>,
+ seen_marker: Option<&str>,
+ timeout: Option<Duration>,
+ ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
+ let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
+
+ let request = PollRangeRequest {
+ filter: filter.unwrap_or_default(),
+ seen_marker,
+ timeout: timeout.as_secs(),
+ };
+
+ let mut req = SignedRequest::new(
+ "POST",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("poll_range", "");
+
+ let payload = serde_json::to_vec(&request)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
+
+ if res.status == StatusCode::NOT_MODIFIED {
+ return Ok(None);
+ }
+
+ let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
+
+ let items = resp
+ .items
+ .into_iter()
+ .map(|BatchReadItem { sk, ct, v }| {
+ (
+ sk,
+ CausalValue {
+ causality: ct,
+ value: v,
+ },
+ )
+ })
+ .collect::<BTreeMap<_, _>>();
+
+ Ok(Some((items, resp.seen_marker)))
+ }
+
/// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item(
&self,
@@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
}
}
+impl AsRef<str> for CausalityToken {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
/// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue {
@@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool,
}
+#[derive(Debug, Default, Clone, Serialize)]
+pub struct PollRangeFilter<'a> {
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ pub prefix: Option<&'a str>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeRequest<'a> {
+ #[serde(flatten)]
+ filter: PollRangeFilter<'a>,
+ seen_marker: Option<&'a str>,
+ timeout: u64,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeResponse {
+ items: Vec<BatchReadItem>,
+ seen_marker: String,
+}
+
impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start {
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 323c2d64..2b525a42 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_model"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,21 +14,21 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", default-features = false, path = "../db" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_table = { version = "0.8.1", path = "../table" }
-garage_block = { version = "0.8.1", path = "../block" }
-garage_util = { version = "0.8.1", path = "../util" }
+garage_db = { version = "0.8.2", default-features = false, path = "../db" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_table = { version = "0.8.2", path = "../table" }
+garage_block = { version = "0.8.2", path = "../block" }
+garage_util = { version = "0.8.2", path = "../util" }
async-trait = "0.1.7"
arc-swap = "1.0"
-blake2 = "0.9"
+blake2 = "0.10"
err-derive = "0.3"
hex = "0.4"
-base64 = "0.13"
-tracing = "0.1.30"
+base64 = "0.21"
+tracing = "0.1"
rand = "0.8"
-zstd = { version = "0.9", default-features = false }
+zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 5bea6b4f..3daa1b33 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,12 +27,14 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data
pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
+ /// The set of background variables that can be viewed/modified at runtime
+ pub bg_vars: vars::BgVars,
/// The replication mode of this cluster
pub replication_mode: ReplicationMode,
@@ -96,7 +98,7 @@ impl Garage {
.cache_capacity(config.sled_cache_capacity)
.flush_every_ms(Some(config.sled_flush_every_ms))
.open()
- .expect("Unable to open sled DB");
+ .ok_or_message("Unable to open sled DB")?;
db::sled_adapter::SledDb::init(db)
}
#[cfg(not(feature = "sled"))]
@@ -107,7 +109,7 @@ impl Garage {
db_path.push("db.sqlite");
info!("Opening Sqlite database at: {}", db_path.display());
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
- .expect("Unable to open sqlite DB");
+ .ok_or_message("Unable to open sqlite DB")?;
db::sqlite_adapter::SqliteDb::init(db)
}
#[cfg(not(feature = "sqlite"))]
@@ -121,7 +123,8 @@ impl Garage {
"lmdb" | "heed" => {
db_path.push("db.lmdb");
info!("Opening LMDB database at: {}", db_path.display());
- std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ std::fs::create_dir_all(&db_path)
+ .ok_or_message("Unable to create LMDB data directory")?;
let map_size = garage_db::lmdb_adapter::recommended_map_size();
use db::lmdb_adapter::heed;
@@ -133,7 +136,16 @@ impl Garage {
env_builder.flag(heed::flags::Flags::MdbNoSync);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
}
- let db = env_builder.open(&db_path).expect("Unable to open LMDB DB");
+ let db = match env_builder.open(&db_path) {
+ Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
+ return Err(Error::Message(
+ "OutOfMemory error while trying to open LMDB database. This can happen \
+ if your operating system is not allowing you to use sufficient virtual \
+ memory address space. Please check that no limit is set (ulimit -v). \
+ On 32-bit machines, you should probably switch to another database engine.".into()))
+ }
+ x => x.ok_or_message("Unable to open LMDB DB")?,
+ };
db::lmdb_adapter::LmdbDb::init(db)
}
#[cfg(not(feature = "lmdb"))]
@@ -156,13 +168,15 @@ impl Garage {
}
};
- let network_key = NetworkKey::from_slice(
- &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
- )
- .expect("Invalid RPC secret key");
+ let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message(
+ "rpc_secret value is missing, not present in config file or in environment",
+ )?)
+ .ok()
+ .and_then(|x| NetworkKey::from_slice(&x))
+ .ok_or_message("Invalid RPC secret key")?;
let replication_mode = ReplicationMode::parse(&config.replication_mode)
- .expect("Invalid replication_mode in config file.");
+ .ok_or_message("Invalid replication_mode in config file.")?;
info!("Initialize membership management system...");
let system = System::new(network_key, replication_mode, &config)?;
@@ -249,9 +263,14 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
+ // Initialize bg vars
+ let mut bg_vars = vars::BgVars::new();
+ block_manager.register_bg_vars(&mut bg_vars);
+
// -- done --
Ok(Arc::new(Self {
config,
+ bg_vars,
replication_mode,
db,
system,
@@ -298,8 +317,10 @@ impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
+
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
@@ -310,7 +331,9 @@ impl GarageK2V {
system.clone(),
db,
);
- let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ info!("Initialize K2V RPC handler...");
+ let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
Self {
item_table,
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 9a692870..c80ebd39 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -1,3 +1,14 @@
+//! Implements a CausalContext, which is a set of timestamps for each
+//! node -- a vector clock --, indicating that the versions with
+//! timestamps <= these numbers have been seen and can be
+//! overwritten by a subsequent write.
+//!
+//! The textual representation of a CausalContext, which we call a
+//! "causality token", is used in the API and must be sent along with
+//! each write or delete operation to indicate the previously seen
+//! versions that we want to overwrite or delete.
+use base64::prelude::*;
+
use std::collections::BTreeMap;
use std::convert::TryInto;
@@ -5,28 +16,44 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+
/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
+pub type VectorClock = BTreeMap<K2VNodeId, u64>;
+
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
let mut tmp = [0u8; 8];
tmp.copy_from_slice(&node_id.as_slice()[..8]);
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
+ a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
+}
+
+pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
+ let mut ret = a.clone();
+ for (n, ts) in b.iter() {
+ let ent = ret.entry(*n).or_insert(0);
+ *ent = std::cmp::max(*ts, *ent);
+ }
+ ret
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
- pub vector_clock: BTreeMap<K2VNodeId, u64>,
+ pub vector_clock: VectorClock,
}
impl CausalContext {
/// Empty causality context
- pub fn new_empty() -> Self {
- Self {
- vector_clock: BTreeMap::new(),
- }
+ pub fn new() -> Self {
+ Self::default()
}
+
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@@ -41,14 +68,15 @@ impl CausalContext {
bytes.extend(u64::to_be_bytes(i));
}
- base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
+ BASE64_URL_SAFE_NO_PAD.encode(bytes)
}
- /// Parse from base64-encoded binary representation
- pub fn parse(s: &str) -> Result<Self, String> {
- let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
- .map_err(|e| format!("bad causality token base64: {}", e))?;
+
+ /// Parse from base64-encoded binary representation.
+ /// Returns None on error.
+ pub fn parse(s: &str) -> Option<Self> {
+ let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 {
- return Err("bad causality token length".into());
+ return None;
}
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@@ -65,16 +93,19 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum {
- return Err("bad causality token checksum".into());
+ return None;
}
- Ok(ret)
+ Some(ret)
}
+
+ pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
+ Self::parse(s).ok_or_bad_request("Invalid causality token")
+ }
+
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
- self.vector_clock
- .iter()
- .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ vclock_gt(&self.vector_clock, &other.vector_clock)
}
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index ce3e4129..9e3ba5a5 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_ts: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,12 +99,14 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
- let mut cc = CausalContext::new_empty();
+ let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}
@@ -173,9 +176,9 @@ impl Crdt for DvvsEntry {
impl PartitionKey for K2VItemPartition {
fn hash(&self) -> Hash {
- use blake2::{Blake2b, Digest};
+ use blake2::{Blake2b512, Digest};
- let mut hasher = Blake2b::new();
+ let mut hasher = Blake2b512::new();
hasher.update(self.bucket_id.as_slice());
hasher.update(self.partition_key.as_bytes());
let mut hash = [0u8; 32];
@@ -310,7 +313,7 @@ mod tests {
values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
};
- let mut e3 = e1.clone();
+ let mut e3 = e1;
e3.merge(&e2);
assert_eq!(e2, e3);
}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..acc1fcdc 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,8 @@
pub mod causality;
+pub mod seen;
pub mod item_table;
-pub mod poll;
pub mod rpc;
+
+pub mod sub;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
deleted file mode 100644
index 93105207..00000000
--- a/src/model/k2v/poll.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::collections::HashMap;
-use std::sync::Mutex;
-
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast;
-
-use crate::k2v::item_table::*;
-
-#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct PollKey {
- pub partition: K2VItemPartition,
- pub sort_key: String,
-}
-
-#[derive(Default)]
-pub struct SubscriptionManager {
- subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
-}
-
-impl SubscriptionManager {
- pub fn new() -> Self {
- Self::default()
- }
-
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(key) {
- s.subscribe()
- } else {
- let (tx, rx) = broadcast::channel(8);
- subs.insert(key.clone(), tx);
- rx
- }
- }
-
- pub fn notify(&self, item: &K2VItem) {
- let key = PollKey {
- partition: item.partition.clone(),
- sort_key: item.sort_key.clone(),
- };
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(&key) {
- if s.send(item.clone()).is_err() {
- // no more subscribers, remove channel from here
- // (we will re-create it later if we need to subscribe again)
- subs.remove(&key);
- }
- }
- }
-}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..37e142f6 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -5,9 +5,10 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Duration;
+use std::collections::{BTreeMap, HashMap};
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex, MutexGuard};
+use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::now_msec;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -25,9 +29,15 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
+use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::seen::*;
+use crate::k2v::sub::*;
+
+const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
+
+const TIMESTAMP_KEY: &[u8] = b"timestamp";
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -40,7 +50,13 @@ enum K2VRpc {
causal_context: CausalContext,
timeout_msec: u64,
},
+ PollRange {
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ },
PollItemResponse(Option<K2VItem>),
+ PollRangeResponse(Uuid, Vec<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@@ -59,6 +75,12 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+
+ // Using a mutex on the local_timestamp_tree is not strictly necessary,
+ // but it helps to not try to do several inserts at the same time,
+ // which would create transaction conflicts and force many useless retries.
+ local_timestamp_tree: Mutex<db::Tree>,
+
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +88,19 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_timestamp_tree = db
+ .open_tree("k2v_local_timestamp")
+ .expect("Unable to open DB tree for k2v local timestamp");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ local_timestamp_tree: Mutex::new(local_timestamp_tree),
endpoint,
subscriptions,
});
@@ -181,7 +208,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -230,9 +257,7 @@ impl K2VRpcHandler {
resp = Some(x);
}
}
- K2VRpc::PollItemResponse(None) => {
- return Ok(None);
- }
+ K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)),
}
}
@@ -240,10 +265,117 @@ impl K2VRpcHandler {
Ok(resp)
}
+ pub async fn poll_range(
+ &self,
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
+ let has_seen_marker = seen_str.is_some();
+
+ // Parse seen marker, we will use it below. This is also the first check
+ // that it is valid, which returns a bad request error if not.
+ let mut seen = seen_str
+ .as_deref()
+ .map(RangeSeenMarker::decode_helper)
+ .transpose()?
+ .unwrap_or_default();
+ seen.restrict(&range);
+
+ // Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&range.partition.hash());
+ let quorum = self.item_table.data.replication.read_quorum();
+ let msg = K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ };
+
+ // Send the request to all nodes, use FuturesUnordered to get the responses in any order
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
+ let mut requests = nodes
+ .iter()
+ .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .collect::<FuturesUnordered<_>>();
+
+ // Fetch responses. This procedure stops fetching responses when any of the following
+ // conditions arise:
+ // - we have a response to all requests
+ // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
+ // has passed since the quorum was achieved
+ // - a global RPC timeout expired
+ // The extra delay after a quorum was received is usefull if the third response was to
+ // arrive during this short interval: this would allow us to consider all the data seen
+ // by that last node in the response we produce, and would likely help reduce the
+ // size of the seen marker that we will return (because we would have an info of the
+ // kind: all items produced by that node until time ts have been returned, so we can
+ // bump the entry in the global vector clock and possibly remove some item-specific
+ // vector clocks)
+ let mut deadline =
+ Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut resps = vec![];
+ let mut errors = vec![];
+ loop {
+ select! {
+ _ = tokio::time::sleep_until(deadline.into()) => {
+ break;
+ }
+ res = requests.next() => match res {
+ None => break,
+ Some(Err(e)) => errors.push(e),
+ Some(Ok(r)) => {
+ resps.push(r);
+ if resps.len() >= quorum {
+ deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
+ }
+ }
+ }
+ }
+ }
+ if errors.len() > nodes.len() - quorum {
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ }
+
+ // Take all returned items into account to produce the response.
+ let mut new_items = BTreeMap::<String, K2VItem>::new();
+ for v in resps {
+ if let K2VRpc::PollRangeResponse(node, items) = v {
+ seen.mark_seen_node_items(node, items.iter());
+ for item in items.into_iter() {
+ match new_items.get_mut(&item.sort_key) {
+ Some(ent) => {
+ ent.merge(&item);
+ }
+ None => {
+ new_items.insert(item.sort_key.clone(), item);
+ }
+ }
+ }
+ } else {
+ return Err(Error::unexpected_rpc_message(v).into());
+ }
+ }
+
+ if new_items.is_empty() && has_seen_marker {
+ Ok(None)
+ } else {
+ Ok(Some((new_items, seen.encode()?)))
+ }
+ }
+
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
- let new = self.local_insert(item)?;
+ let new = {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ self.local_insert(&local_timestamp_tree, item)?
+ };
// Propagate to rest of network
if let Some(updated) = new {
@@ -256,11 +388,14 @@ impl K2VRpcHandler {
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
- for item in items {
- let new = self.local_insert(item)?;
+ {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ for item in items {
+ let new = self.local_insert(&local_timestamp_tree, item)?;
- if let Some(updated) = new {
- updated_vec.push(updated);
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
}
}
@@ -272,10 +407,22 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
- fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ fn local_insert(
+ &self,
+ local_timestamp_tree: &MutexGuard<'_, db::Tree>,
+ item: &InsertedItem,
+ ) -> Result<Option<K2VItem>, Error> {
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_timestamp = tx
+ .get(local_timestamp_tree, TIMESTAMP_KEY)?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,13 +430,25 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_timestamp = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ std::cmp::max(old_local_timestamp, now),
+ );
+
+ tx.insert(
+ local_timestamp_tree,
+ TIMESTAMP_KEY,
+ u64::to_be_bytes(new_local_timestamp),
+ )?;
+
+ Ok(ent)
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -311,6 +470,71 @@ impl K2VRpcHandler {
Ok(value)
}
+
+ async fn handle_poll_range(
+ &self,
+ range: &PollRange,
+ seen_str: &Option<String>,
+ ) -> Result<Vec<K2VItem>, Error> {
+ if let Some(seen_str) = seen_str {
+ let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
+
+ // Subscribe now to all changes on that partition,
+ // so that new items that are inserted while we are reading the range
+ // will be seen in the loop below
+ let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+
+ // Check for the presence of any new items already stored in the item table
+ let mut new_items = self.poll_range_read_range(range, &seen)?;
+
+ // If we found no new items, wait for a matching item to arrive
+ // on the channel
+ while new_items.is_empty() {
+ let item = chan.recv().await?;
+ if range.matches(&item) && seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ } else {
+ // If no seen marker was specified, we do not poll for anything.
+ // We return immediately with the set of known items (even if
+ // it is empty), which will give the client an inital view of
+ // the dataset and an initial seen marker for further
+ // PollRange calls.
+ self.poll_range_read_range(range, &RangeSeenMarker::default())
+ }
+ }
+
+ fn poll_range_read_range(
+ &self,
+ range: &PollRange,
+ seen: &RangeSeenMarker,
+ ) -> Result<Vec<K2VItem>, Error> {
+ let mut new_items = vec![];
+
+ let partition_hash = range.partition.hash();
+ let first_key = match &range.start {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
+ };
+ for item in self.item_table.data.store.range(first_key..)? {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let item = self.item_table.data.decode_entry(&value)?;
+ if !range.matches(&item) {
+ break;
+ }
+ if seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ }
}
#[async_trait]
@@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
+ K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
+ _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
+ }
+ }
m => Err(Error::unexpected_rpc_message(m)),
}
}
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
new file mode 100644
index 00000000..8fe3a582
--- /dev/null
+++ b/src/model/k2v/seen.rs
@@ -0,0 +1,105 @@
+//! Implements a RangeSeenMarker, a data type used in the PollRange API
+//! to indicate which items in the range have already been seen
+//! and which have not been seen yet.
+//!
+//! It consists of a vector clock that indicates that for each node,
+//! all items produced by that node with timestamps <= the value in the
+//! vector clock has been seen, as well as a set of causal contexts for
+//! individual items.
+
+use std::collections::BTreeMap;
+
+use base64::prelude::*;
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::Uuid;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
+use garage_util::error::Error;
+
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+use crate::k2v::sub::*;
+
+#[derive(Debug, Serialize, Deserialize, Default)]
+pub struct RangeSeenMarker {
+ vector_clock: VectorClock,
+ items: BTreeMap<String, VectorClock>,
+}
+
+impl RangeSeenMarker {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn restrict(&mut self, range: &PollRange) {
+ if let Some(start) = &range.start {
+ self.items = self.items.split_off(start);
+ }
+ if let Some(end) = &range.end {
+ self.items.split_off(end);
+ }
+ if let Some(pfx) = &range.prefix {
+ self.items.retain(|k, _v| k.starts_with(pfx));
+ }
+ }
+
+ pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
+ &mut self,
+ node: Uuid,
+ items: I,
+ ) {
+ let node = make_node_id(node);
+ for item in items.into_iter() {
+ let cc = item.causal_context();
+
+ if let Some(ts) = cc.vector_clock.get(&node) {
+ let ent = self.vector_clock.entry(node).or_insert(0);
+ *ent = std::cmp::max(*ent, *ts);
+ }
+
+ if vclock_gt(&cc.vector_clock, &self.vector_clock) {
+ match self.items.get_mut(&item.sort_key) {
+ None => {
+ self.items.insert(item.sort_key.clone(), cc.vector_clock);
+ }
+ Some(ent) => *ent = vclock_max(ent, &cc.vector_clock),
+ }
+ }
+ }
+ }
+
+ pub fn canonicalize(&mut self) {
+ let self_vc = &self.vector_clock;
+ self.items.retain(|_sk, vc| vclock_gt(vc, self_vc))
+ }
+
+ pub fn encode(&mut self) -> Result<String, Error> {
+ self.canonicalize();
+
+ let bytes = nonversioned_encode(&self)?;
+ let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ Ok(BASE64_STANDARD.encode(&bytes))
+ }
+
+ /// Decode from msgpack+zstd+b64 representation, returns None on error.
+ pub fn decode(s: &str) -> Option<Self> {
+ let bytes = BASE64_STANDARD.decode(s).ok()?;
+ let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
+ nonversioned_decode(&bytes).ok()
+ }
+
+ pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
+ Self::decode(s).ok_or_bad_request("Invalid causality token")
+ }
+
+ pub fn is_new_item(&self, item: &K2VItem) -> bool {
+ let cc = item.causal_context();
+ vclock_gt(&cc.vector_clock, &self.vector_clock)
+ && self
+ .items
+ .get(&item.sort_key)
+ .map(|vc| vclock_gt(&cc.vector_clock, vc))
+ .unwrap_or(true)
+ }
+}
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
new file mode 100644
index 00000000..b1daa271
--- /dev/null
+++ b/src/model/k2v/sub.rs
@@ -0,0 +1,110 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollRange {
+ pub partition: K2VItemPartition,
+ pub prefix: Option<String>,
+ pub start: Option<String>,
+ pub end: Option<String>,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
+
+#[derive(Default)]
+pub struct SubscriptionManagerInner {
+ item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
+ part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.item_subscriptions.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.item_subscriptions.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn subscribe_partition(
+ &self,
+ part: &K2VItemPartition,
+ ) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.part_subscriptions.get(part) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.part_subscriptions.insert(part.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn notify(&self, item: &K2VItem) {
+ let mut inner = self.0.lock().unwrap();
+
+ // 1. Notify single item subscribers,
+ // removing subscriptions with no more listeners if any
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ if let Some(s) = inner.item_subscriptions.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.item_subscriptions.remove(&key);
+ }
+ }
+
+ // 2. Notify partition subscribers,
+ // removing subscriptions with no more listeners if any
+ if let Some(s) = inner.part_subscriptions.get(&item.partition) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.part_subscriptions.remove(&item.partition);
+ }
+ }
+ }
+}
+
+impl PollRange {
+ pub fn matches(&self, item: &K2VItem) -> bool {
+ item.partition == self.partition
+ && self
+ .prefix
+ .as_ref()
+ .map(|x| item.sort_key.starts_with(x))
+ .unwrap_or(true)
+ && self
+ .start
+ .as_ref()
+ .map(|x| item.sort_key >= *x)
+ .unwrap_or(true)
+ && self
+ .end
+ .as_ref()
+ .map(|x| item.sort_key < *x)
+ .unwrap_or(true)
+ }
+}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 3d4d3ff5..57c157d0 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,17 +14,18 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.8.1", path = "../util" }
+garage_util = { version = "0.8.2", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
bytesize = "1.1"
gethostname = "0.2"
hex = "0.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
itertools="0.10"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+systemstat = "0.2.3"
async-trait = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
@@ -38,8 +39,7 @@ k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true }
schemars = { version = "0.8", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] }
-# newer version requires rust edition 2021
-pnet_datalink = "0.28"
+pnet_datalink = "0.33"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index b1772a1a..f85f789c 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -113,7 +113,7 @@ impl ConsulDiscovery {
let pubkey = ent
.node_meta
.get("pubkey")
- .and_then(|k| hex::decode(&k).ok())
+ .and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index f734942d..a5f8fc6e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -3,6 +3,9 @@
#[macro_use]
extern crate tracing;
+mod metrics;
+mod system_metrics;
+
#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "kubernetes-discovery")]
@@ -14,7 +17,6 @@ pub mod replication_mode;
pub mod ring;
pub mod system;
-mod metrics;
pub mod rpc_helper;
pub use rpc_helper::*;
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 1ec250c3..e59c372a 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,9 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-use netapp::message::IntoReq;
pub use netapp::message::{
- Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
- PRIO_SECONDARY,
+ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
+ PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 1f4d86e7..c549d8fc 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf};
+use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@@ -38,6 +39,8 @@ use crate::replication_mode::*;
use crate::ring::*;
use crate::rpc_helper::*;
+use crate::system_metrics::*;
+
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
@@ -104,6 +107,8 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ metrics: SystemMetrics,
+
replication_mode: ReplicationMode,
replication_factor: usize,
@@ -113,18 +118,28 @@ pub struct System {
/// Path to metadata directory
pub metadata_dir: PathBuf,
+ /// Path to data directory
+ pub data_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeStatus {
/// Hostname of the node
pub hostname: String,
+
/// Replication factor configured on the node
pub replication_factor: usize,
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
+
+ /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub meta_disk_avail: Option<(u64, u64)>,
+ /// Disk usage on partition containing data directory (tuple: `(avail, total)`)
+ #[serde(default)]
+ pub data_disk_avail: Option<(u64, u64)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -200,7 +215,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> {
} else {
if !metadata_dir.exists() {
info!("Metadata directory does not exist, creating it.");
- std::fs::create_dir(&metadata_dir)?;
+ std::fs::create_dir(metadata_dir)?;
}
info!("Generating new node key pair.");
@@ -266,14 +281,10 @@ impl System {
}
};
- let local_status = NodeStatus {
- hostname: gethostname::gethostname()
- .into_string()
- .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
- replication_factor,
- cluster_layout_version: cluster_layout.version,
- cluster_layout_staging_hash: cluster_layout.staging_hash,
- };
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring));
@@ -365,10 +376,12 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ metrics,
ring,
update_ring: Mutex::new(update_ring),
metadata_dir: config.metadata_dir.clone(),
+ data_dir: config.data_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
Ok(sys)
@@ -406,12 +419,7 @@ impl System {
.get(&n.id.into())
.cloned()
.map(|(_, st)| st)
- .unwrap_or(NodeStatus {
- hostname: "?".to_string(),
- replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
- }),
+ .unwrap_or_else(NodeStatus::unknown),
})
.collect::<Vec<_>>();
known_nodes
@@ -590,6 +598,9 @@ impl System {
let ring = self.ring.borrow();
new_si.cluster_layout_version = ring.layout.version;
new_si.cluster_layout_staging_hash = ring.layout.staging_hash;
+
+ new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
+
self.local_status.swap(Arc::new(new_si));
}
@@ -854,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System {
}
}
+impl NodeStatus {
+ fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ NodeStatus {
+ hostname: gethostname::gethostname()
+ .into_string()
+ .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
+ replication_factor,
+ cluster_layout_version: layout.version,
+ cluster_layout_staging_hash: layout.staging_hash,
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn unknown() -> Self {
+ NodeStatus {
+ hostname: "?".to_string(),
+ replication_factor: 0,
+ cluster_layout_version: 0,
+ cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ meta_disk_avail: None,
+ data_disk_avail: None,
+ }
+ }
+
+ fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) {
+ use systemstat::{Platform, System};
+ let mounts = System::new().mounts().unwrap_or_default();
+
+ let mount_avail = |path: &Path| {
+ mounts
+ .iter()
+ .filter(|x| path.starts_with(&x.fs_mounted_on))
+ .max_by_key(|x| x.fs_mounted_on.len())
+ .map(|x| (x.avail.as_u64(), x.total.as_u64()))
+ };
+
+ self.meta_disk_avail = mount_avail(meta_dir);
+ self.data_disk_avail = mount_avail(data_dir);
+
+ if let Some((avail, total)) = self.meta_disk_avail {
+ metrics
+ .values
+ .meta_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .meta_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ if let Some((avail, total)) = self.data_disk_avail {
+ metrics
+ .values
+ .data_disk_avail
+ .store(avail, Ordering::Relaxed);
+ metrics
+ .values
+ .data_disk_total
+ .store(total, Ordering::Relaxed);
+ }
+ }
+}
+
fn get_default_ip() -> Option<IpAddr> {
pnet_datalink::interfaces()
.iter()
diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs
new file mode 100644
index 00000000..af81b71f
--- /dev/null
+++ b/src/rpc/system_metrics.rs
@@ -0,0 +1,77 @@
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
+
+use opentelemetry::{global, metrics::*, KeyValue};
+
+/// TableMetrics reference all counter used for metrics
+pub struct SystemMetrics {
+ pub(crate) _garage_build_info: ValueObserver<u64>,
+ pub(crate) _replication_factor: ValueObserver<u64>,
+ pub(crate) _disk_avail: ValueObserver<u64>,
+ pub(crate) _disk_total: ValueObserver<u64>,
+ pub(crate) values: Arc<SystemMetricsValues>,
+}
+
+#[derive(Default)]
+pub struct SystemMetricsValues {
+ pub(crate) data_disk_total: AtomicU64,
+ pub(crate) data_disk_avail: AtomicU64,
+ pub(crate) meta_disk_total: AtomicU64,
+ pub(crate) meta_disk_avail: AtomicU64,
+}
+
+impl SystemMetrics {
+ pub fn new(replication_factor: usize) -> Self {
+ let meter = global::meter("garage_system");
+ let values = Arc::new(SystemMetricsValues::default());
+ let values1 = values.clone();
+ let values2 = values.clone();
+ Self {
+ _garage_build_info: meter
+ .u64_value_observer("garage_build_info", move |observer| {
+ observer.observe(
+ 1,
+ &[
+ KeyValue::new("rustversion", garage_util::version::rust_version()),
+ KeyValue::new("version", garage_util::version::garage_version()),
+ ],
+ )
+ })
+ .with_description("Garage build info")
+ .init(),
+ _replication_factor: meter
+ .u64_value_observer("garage_replication_factor", move |observer| {
+ observer.observe(replication_factor as u64, &[])
+ })
+ .with_description("Garage replication factor setting")
+ .init(),
+ _disk_avail: meter
+ .u64_value_observer("garage_local_disk_avail", move |observer| {
+ match values1.data_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values1.meta_disk_avail.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage available disk space on each node")
+ .init(),
+ _disk_total: meter
+ .u64_value_observer("garage_local_disk_total", move |observer| {
+ match values2.data_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "data")]),
+ };
+ match values2.meta_disk_total.load(Ordering::Relaxed) {
+ 0 => (),
+ x => observer.observe(x, &[KeyValue::new("volume", "metadata")]),
+ };
+ })
+ .with_description("Garage total disk space on each node")
+ .init(),
+ values,
+ }
+ }
+}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 3911c945..c794c924 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_util = { version = "0.8.1", path = "../util" }
+garage_db = { version = "0.8.2", path = "../db" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_util = { version = "0.8.2", path = "../util" }
opentelemetry = "0.17"
@@ -25,7 +25,7 @@ arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
diff --git a/src/table/data.rs b/src/table/data.rs
index 5c792f1f..26cc3a5a 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
- Some(mut ent) => {
- ent.merge(&update);
- ent
- }
- None => update.clone(),
- })?;
+ self.update_entry_with(
+ update.partition_key(),
+ update.sort_key(),
+ |_tx, ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&update);
+ Ok(ent)
+ }
+ None => Ok(update.clone()),
+ },
+ )?;
Ok(())
}
@@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
&self,
partition_key: &F::P,
sort_key: &F::S,
- f: impl Fn(Option<F::E>) -> F::E,
+ update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>,
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
@@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = f(Some(old_entry.clone()));
+ let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, f(None)),
+ None => (None, None, update_fn(&mut tx, None)?),
};
// Changed can be true in two scenarios
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 0468b7f4..387471ed 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,11 +14,11 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
+garage_db = { version = "0.8.2", path = "../db" }
arc-swap = "1.0"
async-trait = "0.1"
-blake2 = "0.9"
+blake2 = "0.10"
bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
@@ -27,7 +27,7 @@ hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
-tracing = "0.1.30"
+tracing = "0.1"
rand = "0.8"
sha2 = "0.10"
@@ -35,7 +35,7 @@ chrono = "0.4"
rmp-serde = "1.1"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
-toml = "0.5"
+toml = "0.6"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
@@ -47,6 +47,11 @@ hyper = "0.14"
opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] }
+[build-dependencies]
+rustc_version = "0.4.0"
+
+[dev-dependencies]
+mktemp = "0.5"
[features]
k2v = []
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 41b48e93..607cd7a3 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,5 +1,6 @@
//! Job runner for futures and async functions
+pub mod vars;
pub mod worker;
use std::collections::HashMap;
diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs
new file mode 100644
index 00000000..7a449c95
--- /dev/null
+++ b/src/util/background/vars.rs
@@ -0,0 +1,113 @@
+use std::collections::HashMap;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+use crate::migrate::Migrate;
+use crate::persister::PersisterShared;
+
+pub struct BgVars {
+ vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
+}
+
+impl BgVars {
+ pub fn new() -> Self {
+ Self {
+ vars: HashMap::new(),
+ }
+ }
+
+ pub fn register_rw<V, T, GF, SF>(
+ &mut self,
+ p: &PersisterShared<V>,
+ name: &'static str,
+ get_fn: GF,
+ set_fn: SF,
+ ) where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let p2 = p.clone();
+ let set_fn = move |v| set_fn(&p2, v);
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
+ where
+ V: Migrate + Default + Send + Sync,
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
+ {
+ let p1 = p.clone();
+ let get_fn = move || get_fn(&p1);
+
+ let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
+
+ self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
+ }
+
+ pub fn get(&self, var: &str) -> Result<String, Error> {
+ Ok(self
+ .vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .get())
+ }
+
+ pub fn get_all(&self) -> Vec<(&'static str, String)> {
+ self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
+ }
+
+ pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
+ self.vars
+ .get(var)
+ .ok_or_message("variable does not exist")?
+ .set(val)
+ }
+}
+
+impl Default for BgVars {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ----
+
+trait BgVarTrait: Send + Sync + 'static {
+ fn get(&self) -> String;
+ fn set(&self, v: &str) -> Result<(), Error>;
+}
+
+struct BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Send + Sync + 'static,
+ GF: Fn() -> T + Send + Sync + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ get_fn: GF,
+ set_fn: SF,
+}
+
+impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
+where
+ T: FromStr + ToString + Sync + Send + 'static,
+ GF: Fn() -> T + Sync + Send + 'static,
+ SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
+{
+ fn get(&self) -> String {
+ (self.get_fn)().to_string()
+ }
+
+ fn set(&self, vstr: &str) -> Result<(), Error> {
+ let value = vstr
+ .parse()
+ .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
+ (self.set_fn)(value)
+ }
+}
diff --git a/src/util/build.rs b/src/util/build.rs
new file mode 100644
index 00000000..a4e955b8
--- /dev/null
+++ b/src/util/build.rs
@@ -0,0 +1,8 @@
+use rustc_version::version;
+
+fn main() {
+ // Acquire the version of Rust used to compile, this is added as a label to
+ // the garage_build_info metric.
+ let v = version().unwrap();
+ println!("cargo:rustc-env=RUSTC_VERSION={v}");
+}
diff --git a/src/util/config.rs b/src/util/config.rs
index 04f8375a..2176353e 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -34,7 +34,9 @@ pub struct Config {
pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded
- pub rpc_secret: String,
+ pub rpc_secret: Option<String>,
+ /// Optional file where RPC secret key is read from
+ pub rpc_secret_file: Option<String>,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr,
@@ -118,10 +120,17 @@ pub struct WebConfig {
pub struct AdminConfig {
/// Address and port to bind for admin API serving
pub api_bind_addr: Option<SocketAddr>,
+
/// Bearer token to use to scrape metrics
pub metrics_token: Option<String>,
+ /// File to read metrics token from
+ pub metrics_token_file: Option<String>,
+
/// Bearer token to use to access Admin API endpoints
pub admin_token: Option<String>,
+ /// File to read admin token from
+ pub admin_token_file: Option<String>,
+
/// OTLP server to where to export traces
pub trace_sink: Option<String>,
}
@@ -177,7 +186,57 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut config = String::new();
file.read_to_string(&mut config)?;
- Ok(toml::from_str(&config)?)
+ let mut parsed_config: Config = toml::from_str(&config)?;
+
+ secret_from_file(
+ &mut parsed_config.rpc_secret,
+ &parsed_config.rpc_secret_file,
+ "rpc_secret",
+ )?;
+ secret_from_file(
+ &mut parsed_config.admin.metrics_token,
+ &parsed_config.admin.metrics_token_file,
+ "admin.metrics_token",
+ )?;
+ secret_from_file(
+ &mut parsed_config.admin.admin_token,
+ &parsed_config.admin.admin_token_file,
+ "admin.admin_token",
+ )?;
+
+ Ok(parsed_config)
+}
+
+fn secret_from_file(
+ secret: &mut Option<String>,
+ secret_file: &Option<String>,
+ name: &'static str,
+) -> Result<(), Error> {
+ match (&secret, &secret_file) {
+ (_, None) => {
+ // no-op
+ }
+ (Some(_), Some(_)) => {
+ return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into());
+ }
+ (None, Some(file_path)) => {
+ #[cfg(unix)]
+ if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
+ use std::os::unix::fs::MetadataExt;
+ let metadata = std::fs::metadata(&file_path)?;
+ if metadata.mode() & 0o077 != 0 {
+ return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
+ }
+ }
+ let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?;
+ let mut secret_buf = String::new();
+ file.read_to_string(&mut secret_buf)?;
+ // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`.
+ // also editors sometimes add a trailing newline
+ *secret = Some(String::from(secret_buf.trim_end()));
+ }
+ }
+ Ok(())
}
fn default_compression() -> Option<i32> {
@@ -233,3 +292,116 @@ where
deserializer.deserialize_any(OptionVisitor)
}
+
+#[cfg(test)]
+mod tests {
+ use crate::error::Error;
+ use std::fs::File;
+ use std::io::Write;
+
+ #[test]
+ fn test_rpc_secret() -> Result<(), Error> {
+ let path2 = mktemp::Temp::new_file()?;
+ let mut file2 = File::create(path2.as_path())?;
+ writeln!(
+ file2,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret = "foo"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+
+ let config = super::read_config(path2.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+ drop(path2);
+ drop(file2);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_rpc_secret_file_works() -> Result<(), Error> {
+ let path_secret = mktemp::Temp::new_file()?;
+ let mut file_secret = File::create(path_secret.as_path())?;
+ writeln!(file_secret, "foo")?;
+ drop(file_secret);
+
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ let path_secret_path = path_secret.as_path();
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret_file = "{}"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#,
+ path_secret_path.display()
+ )?;
+ let config = super::read_config(path_config.to_path_buf())?;
+ assert_eq!("foo", config.rpc_secret.unwrap());
+
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::PermissionsExt;
+ let metadata = std::fs::metadata(&path_secret_path)?;
+ let mut perm = metadata.permissions();
+ perm.set_mode(0o660);
+ std::fs::set_permissions(&path_secret_path, perm)?;
+
+ std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false");
+ assert!(super::read_config(path_config.to_path_buf()).is_err());
+
+ std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true");
+ assert!(super::read_config(path_config.to_path_buf()).is_ok());
+ }
+
+ drop(path_config);
+ drop(path_secret);
+ drop(file_config);
+ Ok(())
+ }
+
+ #[test]
+ fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> {
+ let path_config = mktemp::Temp::new_file()?;
+ let mut file_config = File::create(path_config.as_path())?;
+ writeln!(
+ file_config,
+ r#"
+ metadata_dir = "/tmp/garage/meta"
+ data_dir = "/tmp/garage/data"
+ replication_mode = "3"
+ rpc_bind_addr = "[::]:3901"
+ rpc_secret= "dummy"
+ rpc_secret_file = "dummy"
+
+ [s3_api]
+ s3_region = "garage"
+ api_bind_addr = "[::]:3900"
+ "#
+ )?;
+ assert_eq!(
+ "only one of `rpc_secret` and `rpc_secret_file` can be set",
+ super::read_config(path_config.to_path_buf())
+ .unwrap_err()
+ .to_string()
+ );
+ drop(path_config);
+ drop(file_config);
+ Ok(())
+ }
+}
diff --git a/src/util/data.rs b/src/util/data.rs
index 3f61e301..bdd8daee 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -115,9 +115,9 @@ pub fn sha256sum(data: &[u8]) -> Hash {
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash {
- use blake2::{Blake2b, Digest};
+ use blake2::{Blake2b512, Digest};
- let mut hasher = Blake2b::new();
+ let mut hasher = Blake2b512::new();
hasher.update(data);
let mut hash = [0u8; 32];
hash.copy_from_slice(&hasher.finalize()[..32]);
diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs
new file mode 100644
index 00000000..6ae275aa
--- /dev/null
+++ b/src/util/forwarded_headers.rs
@@ -0,0 +1,63 @@
+use http::{HeaderMap, HeaderValue};
+use std::net::IpAddr;
+use std::str::FromStr;
+
+use crate::error::{Error, OkOrMessage};
+
+pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
+ let forwarded_for_header = headers
+ .get("x-forwarded-for")
+ .ok_or_message("X-Forwarded-For header not provided")?;
+
+ let forwarded_for_ip_str = forwarded_for_header
+ .to_str()
+ .ok_or_message("Error parsing X-Forwarded-For header")?;
+
+ let client_ip = IpAddr::from_str(&forwarded_for_ip_str)
+ .ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
+
+ Ok(client_ip.to_string())
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv4_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "192.0.2.100".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "192.0.2.100");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_ipv6_client() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "2001:db8::f00d:cafe".parse().unwrap());
+
+ if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) {
+ assert_eq!(forwarded_ip, "2001:db8::f00d:cafe");
+ }
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_invalid_ip() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("X-Forwarded-For", "www.example.com".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_handle_forwarded_for_headers_missing() {
+ let mut test_headers = HeaderMap::new();
+ test_headers.insert("Host", "www.deuxfleurs.fr".parse().unwrap());
+
+ let result = handle_forwarded_for_headers(&test_headers);
+ assert!(result.is_err());
+ }
+}
diff --git a/src/util/lib.rs b/src/util/lib.rs
index be82061f..c9110fb2 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -11,10 +11,10 @@ pub mod data;
pub mod encode;
pub mod error;
pub mod formater;
+pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
pub mod persister;
pub mod time;
-pub mod token_bucket;
pub mod tranquilizer;
pub mod version;
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 4b9adf51..5c66bbed 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -1,5 +1,6 @@
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
+use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(())
}
}
+
+pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
+
+impl<V: Migrate + Default> Clone for PersisterShared<V> {
+ fn clone(&self) -> PersisterShared<V> {
+ PersisterShared(self.0.clone())
+ }
+}
+
+impl<V: Migrate + Default> PersisterShared<V> {
+ pub fn new(base_dir: &Path, file_name: &str) -> Self {
+ let persister = Persister::new(base_dir, file_name);
+ let value = persister.load().unwrap_or_default();
+ Self(Arc::new((persister, RwLock::new(value))))
+ }
+
+ pub fn get_with<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce(&V) -> R,
+ {
+ let value = self.0 .1.read().unwrap();
+ f(&value)
+ }
+
+ pub fn set_with<F>(&self, f: F) -> Result<(), Error>
+ where
+ F: FnOnce(&mut V),
+ {
+ let mut value = self.0 .1.write().unwrap();
+ f(&mut value);
+ self.0 .0.save(&value)
+ }
+}
diff --git a/src/util/time.rs b/src/util/time.rs
index 257b4d2a..42f41a44 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -25,6 +25,6 @@ pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 {
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
- let timestamp = Utc.timestamp(secs, nanos);
+ let timestamp = Utc.timestamp_opt(secs, nanos).unwrap();
timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
}
diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs
deleted file mode 100644
index cc0dfa1f..00000000
--- a/src/util/token_bucket.rs
+++ /dev/null
@@ -1,40 +0,0 @@
-use std::time::{Duration, Instant};
-
-use tokio::time::sleep;
-
-pub struct TokenBucket {
- // Replenish rate: number of tokens per second
- replenish_rate: u64,
- // Current number of tokens
- tokens: u64,
- // Last replenish time
- last_replenish: Instant,
-}
-
-impl TokenBucket {
- pub fn new(replenish_rate: u64) -> Self {
- Self {
- replenish_rate,
- tokens: 0,
- last_replenish: Instant::now(),
- }
- }
-
- pub async fn take(&mut self, tokens: u64) {
- while self.tokens < tokens {
- let needed = tokens - self.tokens;
- let delay = (needed as f64) / (self.replenish_rate as f64);
- sleep(Duration::from_secs_f64(delay)).await;
- self.replenish();
- }
- self.tokens -= tokens;
- }
-
- pub fn replenish(&mut self) {
- let now = Instant::now();
- let new_tokens =
- ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
- self.tokens += new_tokens;
- self.last_replenish = now;
- }
-}
diff --git a/src/util/version.rs b/src/util/version.rs
index b515dccc..2b2ea271 100644
--- a/src/util/version.rs
+++ b/src/util/version.rs
@@ -26,3 +26,7 @@ pub fn init_version(version: &'static str) {
pub fn init_features(features: &'static [&'static str]) {
FEATURES.store(Some(Arc::new(features)));
}
+
+pub fn rust_version() -> &'static str {
+ env!("RUSTC_VERSION")
+}
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index dbc5e5fb..d0a23af4 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_web"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,13 +14,13 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.8.1", path = "../api" }
-garage_model = { version = "0.8.1", path = "../model" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_table = { version = "0.8.1", path = "../table" }
+garage_api = { version = "0.8.2", path = "../api" }
+garage_model = { version = "0.8.2", path = "../model" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_table = { version = "0.8.2", path = "../table" }
err-derive = "0.3"
-tracing = "0.1.30"
+tracing = "0.1"
percent-encoding = "2.1.0"
futures = "0.3"
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 1541c297..0c7edf23 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -29,6 +29,7 @@ use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
+use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
struct WebMetrics {
@@ -104,7 +105,19 @@ impl WebServer {
req: Request<Body>,
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
- info!("{} {} {}", addr, req.method(), req.uri());
+ if let Ok(forwarded_for_ip_addr) =
+ forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ {
+ info!(
+ "{} (via {}) {} {}",
+ forwarded_for_ip_addr,
+ addr,
+ req.method(),
+ req.uri()
+ );
+ } else {
+ info!("{} {} {}", addr, req.method(), req.uri());
+ }
// Lots of instrumentation
let tracer = opentelemetry::global::tracer("garage");
@@ -249,7 +262,6 @@ impl WebServer {
);
*error_doc.status_mut() = error.http_status_code();
- error.add_headers(error_doc.headers_mut());
// Preserve error message in a special header
for error_line in error.to_string().split('\n') {