diff options
Diffstat (limited to 'src')
80 files changed, 2801 insertions, 707 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index dba0bbef..9babec02 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,35 +14,35 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.8.1", path = "../model" } -garage_table = { version = "0.8.1", path = "../table" } -garage_block = { version = "0.8.1", path = "../block" } -garage_util = { version = "0.8.1", path = "../util" } -garage_rpc = { version = "0.8.1", path = "../rpc" } +garage_model = { version = "0.8.2", path = "../model" } +garage_table = { version = "0.8.2", path = "../table" } +garage_block = { version = "0.8.2", path = "../block" } +garage_util = { version = "0.8.2", path = "../util" } +garage_rpc = { version = "0.8.2", path = "../rpc" } async-trait = "0.1.7" -base64 = "0.13" +base64 = "0.21" bytes = "1.0" chrono = "0.4" crypto-common = "0.1" err-derive = "0.3" hex = "0.4" hmac = "0.12" -idna = "0.2" -tracing = "0.1.30" +idna = "0.3" +tracing = "0.1" md-5 = "0.10" nom = "7.1" sha2 = "0.10" futures = "0.3" futures-util = "0.3" -pin-project = "1.0.11" +pin-project = "1.0.12" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio-stream = "0.1" form_urlencoded = "1.0.0" http = "0.2" -httpdate = "0.3" +httpdate = "1.0" http-range = "0.1" hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } multer = "2.0" @@ -51,8 +51,8 @@ roxmltree = "0.14" serde = { version = "1.0", features = ["derive"] } serde_bytes = "0.11" serde_json = "1.0" -quick-xml = { version = "0.21", features = [ "serialize" ] } -url = "2.1" +quick-xml = { version = "0.26", features = [ "serialize" ] } +url = "2.3" opentelemetry = "0.17" opentelemetry-prometheus = { version = "0.10", optional = true } diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 2d325fb1..58dd38d8 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -77,6 +78,60 @@ impl AdminApiServer { .body(Body::empty())?) } + async fn handle_check_website_enabled( + &self, + req: Request<Body>, + ) -> Result<Response<Body>, Error> { + let query_params: HashMap<String, String> = req + .uri() + .query() + .map(|v| { + url::form_urlencoded::parse(v.as_bytes()) + .into_owned() + .collect() + }) + .unwrap_or_else(HashMap::new); + + let has_domain_key = query_params.contains_key("domain"); + + if !has_domain_key { + return Err(Error::bad_request("No domain query string found")); + } + + let domain = query_params + .get("domain") + .ok_or_internal_error("Could not parse domain query string")?; + + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&domain) + .await? + .ok_or(HelperError::NoSuchBucket(domain.to_string()))?; + + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let bucket_state = bucket.state.as_option().unwrap(); + let bucket_website_config = bucket_state.website_config.get(); + + match bucket_website_config { + Some(_v) => { + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::from(format!( + "Bucket '{domain}' is authorized for website hosting" + )))?) + } + None => Err(Error::bad_request(format!( + "Bucket '{domain}' is not authorized for website hosting" + ))), + } + } + fn handle_health(&self) -> Result<Response<Body>, Error> { let health = self.garage.system.health(); @@ -174,6 +229,7 @@ impl ApiHandler for AdminApiServer { match endpoint { Endpoint::Options => self.handle_options(&req), + Endpoint::CheckWebsiteEnabled => self.handle_check_website_enabled(req).await, Endpoint::Health => self.handle_health(), Endpoint::Metrics => self.handle_metrics(), Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await, diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 65034852..e60f07ca 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -167,7 +167,7 @@ async fn bucket_info_results( let quotas = state.quotas.get(); let res = GetBucketInfoResult { - id: hex::encode(&bucket.id), + id: hex::encode(bucket.id), global_aliases: state .aliases .items() @@ -575,6 +575,6 @@ pub async fn handle_local_unalias_bucket( // ---- HELPER ---- fn parse_bucket_id(id: &str) -> Result<Uuid, Error> { - let id_hex = hex::decode(&id).ok_or_bad_request("Invalid bucket id")?; + let id_hex = hex::decode(id).ok_or_bad_request("Invalid bucket id")?; Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?) } diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 540c6009..b2508d2e 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -20,6 +20,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response< node: hex::encode(garage.system.id), garage_version: garage_util::version::garage_version(), garage_features: garage_util::version::garage_features(), + rust_version: garage_util::version::rust_version(), db_engine: garage.db.engine(), known_nodes: garage .system @@ -106,6 +107,7 @@ struct GetClusterStatusResponse { node: String, garage_version: &'static str, garage_features: Option<&'static [&'static str]>, + rust_version: &'static str, db_engine: String, known_nodes: HashMap<String, KnownNodeResp>, layout: GetClusterLayoutResponse, diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs index 62e6abc3..0dcb1546 100644 --- a/src/api/admin/router.rs +++ b/src/api/admin/router.rs @@ -17,6 +17,7 @@ router_match! {@func #[derive(Debug, Clone, PartialEq, Eq)] pub enum Endpoint { Options, + CheckWebsiteEnabled, Health, Metrics, GetClusterStatus, @@ -91,6 +92,7 @@ impl Endpoint { let res = router_match!(@gen_path_parser (req.method(), path, query) [ OPTIONS _ => Options, + GET "/check" => CheckWebsiteEnabled, GET "/health" => Health, GET "/metrics" => Metrics, GET "/v0/status" => GetClusterStatus, @@ -136,6 +138,7 @@ impl Endpoint { pub fn authorization_type(&self) -> Authorization { match self { Self::Health => Authorization::None, + Self::CheckWebsiteEnabled => Authorization::None, Self::Metrics => Authorization::MetricsToken, _ => Authorization::AdminToken, } diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs index 62fe4e5a..d0354d28 100644 --- a/src/api/generic_server.rs +++ b/src/api/generic_server.rs @@ -19,6 +19,7 @@ use opentelemetry::{ }; use garage_util::error::Error as GarageError; +use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; pub(crate) trait ApiEndpoint: Send + Sync + 'static { @@ -125,7 +126,20 @@ impl<A: ApiHandler> ApiServer<A> { addr: SocketAddr, ) -> Result<Response<Body>, GarageError> { let uri = req.uri().clone(); - info!("{} {} {}", addr, req.method(), uri); + + if let Ok(forwarded_for_ip_addr) = + forwarded_headers::handle_forwarded_for_headers(&req.headers()) + { + info!( + "{} (via {}) {} {}", + forwarded_for_ip_addr, + addr, + req.method(), + uri + ); + } else { + info!("{} {} {}", addr, req.method(), uri); + } debug!("{:?}", req); let tracer = opentelemetry::global::tracer("garage"); diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 084867b5..bb85b2e7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer { Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::PollRange { partition_key } => { + handle_poll_range(garage, bucket_id, &partition_key, req).await + } Endpoint::Options => unreachable!(), }; diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index 78035362..26d678da 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,10 +1,10 @@ use std::sync::Arc; +use base64::prelude::*; use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_table::{EnumerationOrder, TableSchema}; @@ -25,15 +25,13 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it - .ct - .map(|s| CausalContext::parse(&s)) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { - Some(vs) => { - DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) - } + Some(vs) => DvvsValue::Value( + BASE64_STANDARD + .decode(vs) + .ok_or_bad_request("Invalid base64 value")?, + ), None => DvvsValue::Deleted, }; items2.push((it.pk, it.sk, ct, v)); @@ -65,10 +63,7 @@ pub async fn handle_read_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_read_batch_query( @@ -160,10 +155,7 @@ pub async fn handle_delete_batch( resps.push(resp?); } - let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resps)?) } async fn handle_delete_batch_query( @@ -257,6 +249,53 @@ async fn handle_delete_batch_query( }) } +pub(crate) async fn handle_poll_range( + garage: Arc<Garage>, + bucket_id: Uuid, + partition_key: &str, + req: Request<Body>, +) -> Result<Response<Body>, Error> { + use garage_model::k2v::sub::PollRange; + + let query = parse_json_body::<PollRangeQuery>(req).await?; + + let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; + + let resp = garage + .k2v + .rpc + .poll_range( + PollRange { + partition: K2VItemPartition { + bucket_id, + partition_key: partition_key.to_string(), + }, + start: query.start, + end: query.end, + prefix: query.prefix, + }, + query.seen_marker, + timeout_msec, + ) + .await?; + + if let Some((items, seen_marker)) = resp { + let resp = PollRangeResponse { + items: items + .into_iter() + .map(|(_k, i)| ReadBatchResponseItem::from(i)) + .collect::<Vec<_>>(), + seen_marker, + }; + + Ok(json_ok_response(&resp)?) + } else { + Ok(Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty())?) + } +} + #[derive(Deserialize)] struct InsertBatchItem { pk: String, @@ -322,7 +361,7 @@ impl ReadBatchResponseItem { .values() .iter() .map(|v| match v { - DvvsValue::Value(x) => Some(base64::encode(x)), + DvvsValue::Value(x) => Some(BASE64_STANDARD.encode(x)), DvvsValue::Deleted => None, }) .collect::<Vec<_>>(); @@ -361,3 +400,24 @@ struct DeleteBatchResponse { #[serde(rename = "deletedItems")] deleted_items: usize, } + +#[derive(Deserialize)] +struct PollRangeQuery { + #[serde(default)] + prefix: Option<String>, + #[serde(default)] + start: Option<String>, + #[serde(default)] + end: Option<String>, + #[serde(default)] + timeout: Option<u64>, + #[serde(default, rename = "seenMarker")] + seen_marker: Option<String>, +} + +#[derive(Serialize)] +struct PollRangeResponse { + items: Vec<ReadBatchResponseItem>, + #[serde(rename = "seenMarker")] + seen_marker: String, +} diff --git a/src/api/k2v/error.rs b/src/api/k2v/error.rs index 42491466..4eb017ab 100644 --- a/src/api/k2v/error.rs +++ b/src/api/k2v/error.rs @@ -19,7 +19,7 @@ pub enum Error { // Category: cannot process /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), /// The object requested don't exists diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 210950bf..6c1d4a91 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use hyper::{Body, Response, StatusCode}; +use hyper::{Body, Response}; use serde::Serialize; use garage_util::data::*; -use garage_util::error::Error as GarageError; use garage_rpc::ring::Ring; use garage_table::util::*; @@ -12,6 +11,7 @@ use garage_table::util::*; use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; +use crate::helpers::*; use crate::k2v::error::*; use crate::k2v::range::read_range; @@ -68,10 +68,7 @@ pub async fn handle_read_index( next_start, }; - let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?; - Ok(Response::builder() - .status(StatusCode::OK) - .body(Body::from(resp_json))?) + Ok(json_ok_response(&resp)?) } #[derive(Serialize)] diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index f85138c7..e13a0f30 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use base64::prelude::*; use http::header; use hyper::{Body, Request, Response, StatusCode}; @@ -81,7 +82,7 @@ impl ReturnFormat { .iter() .map(|v| match v { DvvsValue::Deleted => serde_json::Value::Null, - DvvsValue::Value(v) => serde_json::Value::String(base64::encode(v)), + DvvsValue::Value(v) => serde_json::Value::String(BASE64_STANDARD.encode(v)), }) .collect::<Vec<_>>(); let json_body = @@ -133,9 +134,8 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -169,9 +169,8 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let value = DvvsValue::Deleted; @@ -208,15 +207,17 @@ pub async fn handle_poll_item( let causal_context = CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?; + let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000; + let item = garage .k2v .rpc - .poll( + .poll_item( bucket_id, partition_key, sort_key, causal_context, - timeout_secs.unwrap_or(300) * 1000, + timeout_msec, ) .await?; diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs index e7a3dd69..1cc58be5 100644 --- a/src/api/k2v/router.rs +++ b/src/api/k2v/router.rs @@ -32,6 +32,9 @@ pub enum Endpoint { causality_token: String, timeout: Option<u64>, }, + PollRange { + partition_key: String, + }, ReadBatch { }, ReadIndex { @@ -113,6 +116,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => ReadBatch, @@ -142,6 +146,7 @@ impl Endpoint { @gen_parser (query.keyword.take().unwrap_or_default(), partition_key, query, None), key: [ + POLL_RANGE => PollRange, ], no_key: [ EMPTY => InsertBatch, @@ -234,7 +239,8 @@ impl Endpoint { generateQueryParameters! { keywords: [ "delete" => DELETE, - "search" => SEARCH + "search" => SEARCH, + "poll_range" => POLL_RANGE ], fields: [ "prefix" => prefix, diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index 8471385f..733981e1 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { let mut ret = None; for item in cbc.children() { if item.has_tag_name("LocationConstraint") { - if ret != None { + if ret.is_some() { return None; } ret = Some(item.text()?.to_string()); diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs index 67009d63..c50cff9f 100644 --- a/src/api/s3/error.rs +++ b/src/api/s3/error.rs @@ -21,7 +21,7 @@ pub enum Error { // Category: cannot process /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), /// The object requested don't exists diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e5f486c8..5cb0d65a 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; +use base64::prelude::*; use hyper::{Body, Response}; use garage_util::data::*; @@ -129,11 +130,11 @@ pub async fn handle_list( next_continuation_token: match (query.is_v2, &pagination) { (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( "]{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( "[{}", - base64::encode(key.as_bytes()) + BASE64_STANDARD.encode(key.as_bytes()) ))), _ => None, }, @@ -583,14 +584,16 @@ impl ListObjectsQuery { (Some(token), _) => match &token[..1] { "[" => Ok(RangeBegin::IncludingKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, fallback_key: None, }), "]" => Ok(RangeBegin::AfterKey { key: String::from_utf8( - base64::decode(token[1..].as_bytes()) + BASE64_STANDARD + .decode(token[1..].as_bytes()) .ok_or_bad_request("Invalid continuation token")?, )?, }), diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index d063faa4..f2098ab0 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -4,6 +4,7 @@ use std::ops::RangeInclusive; use std::sync::Arc; use std::task::{Context, Poll}; +use base64::prelude::*; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; use futures::{Stream, StreamExt}; @@ -138,7 +139,9 @@ pub async fn handle_post_object( .get_existing_bucket(bucket_id) .await?; - let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?; + let decoded_policy = BASE64_STANDARD + .decode(policy) + .ok_or_bad_request("Invalid policy")?; let decoded_policy: Policy = serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 97b8e4e3..350ab884 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; +use base64::prelude::*; use futures::prelude::*; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; @@ -119,6 +120,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( return Ok((version_uuid, data_md5sum_hex)); } + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(( + garage.clone(), + bucket.id, + key.into(), + version_uuid, + version_timestamp, + ))); + // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { @@ -139,44 +151,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Transfer data and verify checksum let first_block_hash = async_blake2sum(first_block.clone()).await; - let tx_result = (|| async { - let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - 1, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - check_quotas(&garage, bucket, key, total_size).await?; + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; - Ok((total_size, data_md5sum)) - })() - .await; + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; - // If something went wrong, clean up - let (total_size, md5sum_arr) = match tx_result { - Ok(rv) => rv, - Err(e) => { - // Mark object as aborted, this will free the blocks further down - object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); - garage.object_table.insert(&object).await?; - return Err(e); - } - }; + check_quotas(&garage, bucket, key, total_size).await?; // Save final object state, marked as Complete - let md5sum_hex = hex::encode(md5sum_arr); + let md5sum_hex = hex::encode(data_md5sum); object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -188,6 +183,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( let object = Object::new(bucket.id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + Ok((version_uuid, md5sum_hex)) } @@ -209,7 +208,7 @@ fn ensure_checksum_matches( } } if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) { return Err(Error::bad_request("Unable to validate content-md5")); } else { trace!("Successfully validated content-md5"); @@ -426,6 +425,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } +struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + tokio::spawn(async move { + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_ts, + state: ObjectVersionState::Aborted, + }; + let object = Object::new(bucket_id, key, vec![object_version]); + if let Err(e) = garage.object_table.insert(&object).await { + warn!("Cannot cleanup after aborted PutObject: {}", e); + } + }); + } + } +} + +// ---- + pub async fn handle_create_multipart_upload( garage: Arc<Garage>, req: &Request<Body>, diff --git a/src/api/signature/error.rs b/src/api/signature/error.rs index f5a067bd..f0d7c816 100644 --- a/src/api/signature/error.rs +++ b/src/api/signature/error.rs @@ -11,7 +11,7 @@ pub enum Error { Common(CommonError), /// Authorization Header Malformed - #[error(display = "Authorization header malformed, expected scope: {}", _0)] + #[error(display = "Authorization header malformed, unexpected scope: {}", _0)] AuthorizationHeaderMalformed(String), // Category: bad request diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 1e4eb64e..c6985754 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_block" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_util = { version = "0.8.1", path = "../util" } -garage_table = { version = "0.8.1", path = "../table" } +garage_db = { version = "0.8.2", path = "../db" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_util = { version = "0.8.2", path = "../util" } +garage_table = { version = "0.8.2", path = "../table" } opentelemetry = "0.17" @@ -25,11 +25,11 @@ arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" hex = "0.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" async-compression = { version = "0.3", features = ["tokio", "zstd"] } -zstd = { version = "0.9", default-features = false } +zstd = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/block/manager.rs b/src/block/manager.rs index 1b5a5df0..26278974 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -6,6 +6,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; +use rand::prelude::*; use serde::{Deserialize, Serialize}; use futures::Stream; @@ -23,10 +24,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::{vars, BackgroundRunner}; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::persister::PersisterShared; +use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; @@ -89,6 +92,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, + pub scrub_persister: PersisterShared<ScrubWorkerPersisted>, tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, } @@ -125,8 +129,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let metrics = - BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + let metrics = BlockManagerMetrics::new( + compression_level, + rc.rc.clone(), + resync.queue.clone(), + resync.errors.clone(), + ); + + let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); let block_manager = Arc::new(Self { replication, @@ -138,9 +148,11 @@ impl BlockManager { system, endpoint, metrics, + scrub_persister, tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.scrub_persister.set_with(|_| ()).unwrap(); block_manager } @@ -155,7 +167,31 @@ impl BlockManager { // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } + + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + self.resync.register_bg_vars(vars); + + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-next-run", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_next_run_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); } /// Ask nodes that might have a (possibly compressed) block for it @@ -649,14 +685,21 @@ impl BlockManagerLocked { } }; - let mut path2 = path.clone(); - path2.set_extension("tmp"); - let mut f = fs::File::create(&path2).await?; + let mut path_tmp = path.clone(); + let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>())); + path_tmp.set_extension(tmp_extension); + + let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone())); + + let mut f = fs::File::create(&path_tmp).await?; f.write_all(data).await?; f.sync_all().await?; drop(f); - fs::rename(path2, path).await?; + fs::rename(path_tmp, path).await?; + + delete_on_drop.cancel(); + if let Some(to_delete) = to_delete { fs::remove_file(to_delete).await?; } @@ -722,3 +765,23 @@ async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { .concat() .into()) } + +struct DeleteOnDrop(Option<PathBuf>); + +impl DeleteOnDrop { + fn cancel(&mut self) { + drop(self.0.take()); + } +} + +impl Drop for DeleteOnDrop { + fn drop(&mut self) { + if let Some(path) = self.0.take() { + tokio::spawn(async move { + if let Err(e) = fs::remove_file(&path).await { + debug!("DeleteOnDrop failed for {}: {}", path.display(), e); + } + }); + } + } +} diff --git a/src/block/metrics.rs b/src/block/metrics.rs index fbef95af..6659df32 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -5,6 +5,7 @@ use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct BlockManagerMetrics { + pub(crate) _compression_level: ValueObserver<u64>, pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>, @@ -25,9 +26,23 @@ pub struct BlockManagerMetrics { } impl BlockManagerMetrics { - pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self { + pub fn new( + compression_level: Option<i32>, + rc_tree: db::Tree, + resync_queue: CountedTree, + resync_errors: CountedTree, + ) -> Self { let meter = global::meter("garage_model/block"); Self { + _compression_level: meter + .u64_value_observer("block.compression_level", move |observer| { + match compression_level { + Some(v) => observer.observe(v as u64, &[]), + None => observer.observe(0_u64, &[]), + } + }) + .with_description("Garage compression level for node") + .init(), _rc_size: meter .u64_value_observer("block.rc_size", move |observer| { if let Ok(Some(v)) = rc_tree.fast_len() { diff --git a/src/block/rc.rs b/src/block/rc.rs index 8dae3960..94cb5eea 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -24,9 +24,9 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult<bool> { - let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?); match old_rc.increment().serialize() { - Some(x) => tx.insert(&self.rc, &hash, x)?, + Some(x) => tx.insert(&self.rc, hash, x)?, None => unreachable!(), }; Ok(old_rc.is_zero()) @@ -39,10 +39,10 @@ impl BlockRc { tx: &mut db::Transaction, hash: &Hash, ) -> db::TxOpResult<bool> { - let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); + let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement(); match new_rc.serialize() { - Some(x) => tx.insert(&self.rc, &hash, x)?, - None => tx.remove(&self.rc, &hash)?, + Some(x) => tx.insert(&self.rc, hash, x)?, + None => tx.remove(&self.rc, hash)?, }; Ok(matches!(new_rc, RcEntry::Deletable { .. })) } @@ -57,10 +57,10 @@ impl BlockRc { pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> { let now = now_msec(); self.rc.db().transaction(|mut tx| { - let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); + let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?); match rcval { RcEntry::Deletable { at_time } if now > at_time => { - tx.remove(&self.rc, &hash)?; + tx.remove(&self.rc, hash)?; } _ => (), }; diff --git a/src/block/repair.rs b/src/block/repair.rs index a6ded65a..c89484d9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use serde::{Deserialize, Serialize}; +use rand::Rng; use tokio::fs; use tokio::select; use tokio::sync::mpsc; @@ -13,14 +13,14 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; -// Full scrub every 30 days -const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); +// Full scrub every 25 days with a random element of 10 days mixed in below +const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25); // Scrub tranquility is initially set to 4, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_SCRUB_TRANQUILITY: u32 = 4; @@ -161,6 +161,51 @@ impl Worker for RepairWorker { // and whose parameter (esp. speed) can be controlled at runtime. // ---- ---- ---- +mod v081 { + use serde::{Deserialize, Serialize}; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +} + +mod v082 { + use serde::{Deserialize, Serialize}; + + use super::v081; + + #[derive(Serialize, Deserialize)] + pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) time_next_run_scrub: u64, + pub(crate) corruptions_detected: u64, + } + + impl garage_util::migrate::Migrate for ScrubWorkerPersisted { + type Previous = v081::ScrubWorkerPersisted; + const VERSION_MARKER: &'static [u8] = b"G082bswp"; + + fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted { + use crate::repair::randomize_next_scrub_run_time; + + ScrubWorkerPersisted { + tranquility: old.tranquility, + time_last_complete_scrub: old.time_last_complete_scrub, + time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), + corruptions_detected: old.corruptions_detected, + } + } + } +} + +pub use v082::*; + pub struct ScrubWorker { manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, @@ -168,17 +213,33 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister<ScrubWorkerPersisted>, - persisted: ScrubWorkerPersisted, + persister: PersisterShared<ScrubWorkerPersisted>, +} + +fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { + // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to + // balance scrub load across different cluster nodes. + + let next_run_timestamp = timestamp + + SCRUB_INTERVAL + .saturating_add(Duration::from_secs( + rand::thread_rng().gen_range(0..3600 * 24 * 10), + )) + .as_millis() as u64; + + next_run_timestamp } -#[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } } -impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} enum ScrubWorkerState { Running(BlockStoreIterator), @@ -198,27 +259,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc<BlockManager>, + rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, + persister: PersisterShared<ScrubWorkerPersisted>, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -227,6 +281,7 @@ impl ScrubWorker { ScrubWorkerCommand::Start => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Finished => { + info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); ScrubWorkerState::Running(iterator) } @@ -267,12 +322,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -284,9 +333,19 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, + p.time_next_run_scrub, + ) + }); + let mut s = WorkerStatus { - persistent_errors: Some(self.persisted.corruptions_detected), - tranquility: Some(self.persisted.tranquility), + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), ..Default::default() }; match &self.work { @@ -298,10 +357,16 @@ impl Worker for ScrubWorker { s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } ScrubWorkerState::Finished => { - s.freeform = vec![format!( - "Last scrub completed at {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) - )]; + s.freeform = vec![ + format!( + "Last scrub completed at {}", + msec_to_rfc3339(time_last_complete_scrub), + ), + format!( + "Next scrub scheduled for {}", + msec_to_rfc3339(time_next_run_scrub) + ), + ]; } } s @@ -321,20 +386,30 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + let now = now_msec(); + let next_scrub_timestamp = randomize_next_scrub_run_time(now); + + self.persister.set_with(|p| { + p.time_last_complete_scrub = now; + p.time_next_run_scrub = next_scrub_timestamp; + })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); + + info!( + "Datastore scrub completed, next scrub scheduled for {}", + msec_to_rfc3339(next_scrub_timestamp) + ); + Ok(WorkerState::Idle) } } @@ -347,7 +422,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, ), }; @@ -462,11 +537,11 @@ impl BlockStoreIterator { let ent_type = data_dir_ent.file_type().await?; let name = name.strip_suffix(".zst").unwrap_or(&name); - if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + if name.len() == 2 && hex::decode(name).is_ok() && ent_type.is_dir() { let path = data_dir_ent.path(); self.path.push(ReadingDir::Pending(path)); } else if name.len() == 64 { - if let Ok(h) = hex::decode(&name) { + if let Ok(h) = hex::decode(name) { let mut hash = [0u8; 32]; hash.copy_from_slice(&h); return Ok(Some(hash.into())); diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c7b3b0e..ea280ad4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -22,7 +21,7 @@ use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2; pub struct BlockResyncManager { pub(crate) queue: CountedTree, - pub(crate) notify: Notify, + pub(crate) notify: Arc<Notify>, pub(crate) errors: CountedTree, busy_set: BusySet, - persister: Persister<ResyncPersistedConfig>, - persisted: ArcSwap<ResyncPersistedConfig>, + persister: PersisterShared<ResyncPersistedConfig>, } #[derive(Serialize, Deserialize, Clone, Copy)] @@ -64,6 +62,14 @@ struct ResyncPersistedConfig { tranquility: u32, } impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} +impl Default for ResyncPersistedConfig { + fn default() -> Self { + ResyncPersistedConfig { + n_workers: 1, + tranquility: INITIAL_RESYNC_TRANQUILITY, + } + } +} enum ResyncIterResult { BusyDidSomething, @@ -91,22 +97,14 @@ impl BlockResyncManager { .expect("Unable to open block_local_resync_errors tree"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); - let persister = Persister::new(&system.metadata_dir, "resync_cfg"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ResyncPersistedConfig { - n_workers: 1, - tranquility: INITIAL_RESYNC_TRANQUILITY, - }, - }; + let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg"); Self { queue, - notify: Notify::new(), + notify: Arc::new(Notify::new()), errors, busy_set: Arc::new(Mutex::new(HashSet::new())), persister, - persisted: ArcSwap::new(Arc::new(persisted)), } } @@ -142,6 +140,38 @@ impl BlockResyncManager { ))) } + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-worker-count", + |p| p.get_with(|x| x.n_workers), + move |p, n_workers| { + if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + p.set_with(|x| x.n_workers = n_workers)?; + notify.notify_waiters(); + Ok(()) + }, + ); + + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-tranquility", + |p| p.get_with(|x| x.tranquility), + move |p, tranquility| { + p.set_with(|x| x.tranquility = tranquility)?; + notify.notify_waiters(); + Ok(()) + }, + ); + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -436,33 +466,6 @@ impl BlockResyncManager { Ok(()) } - - async fn update_persisted( - &self, - update: impl Fn(&mut ResyncPersistedConfig), - ) -> Result<(), Error> { - let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref(); - update(&mut cfg); - self.persister.save_async(&cfg).await?; - self.persisted.store(Arc::new(cfg)); - self.notify.notify_waiters(); - Ok(()) - } - - pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { - if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { - return Err(Error::Message(format!( - "Invalid number of resync workers, must be between 1 and {}", - MAX_RESYNC_WORKERS - ))); - } - self.update_persisted(|cfg| cfg.n_workers = n_workers).await - } - - pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { - self.update_persisted(|cfg| cfg.tranquility = tranquility) - .await - } } impl Drop for BusyBlock { @@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker { manager: Arc<BlockManager>, tranquilizer: Tranquilizer, next_delay: Duration, + persister: PersisterShared<ResyncPersistedConfig>, } impl ResyncWorker { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self { + let persister = manager.resync.persister.clone(); Self { index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), + persister, } } } @@ -497,9 +503,9 @@ impl Worker for ResyncWorker { } fn status(&self) -> WorkerStatus { - let persisted = self.manager.resync.persisted.load(); + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); - if self.index >= persisted.n_workers { + if self.index >= n_workers { return WorkerStatus { freeform: vec!["This worker is currently disabled".into()], ..Default::default() @@ -508,22 +514,24 @@ impl Worker for ResyncWorker { WorkerStatus { queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), - tranquility: Some(persisted.tranquility), + tranquility: Some(tranquility), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - if self.index >= self.manager.resync.persisted.load().n_workers { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); + + if self.index >= n_workers { return Ok(WorkerState::Idle); } self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { - Ok(ResyncIterResult::BusyDidSomething) => Ok(self - .tranquilizer - .tranquilize_worker(self.manager.resync.persisted.load().tranquility)), + Ok(ResyncIterResult::BusyDidSomething) => { + Ok(self.tranquilizer.tranquilize_worker(tranquility)) + } Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -542,7 +550,7 @@ impl Worker for ResyncWorker { } async fn wait_for_work(&mut self) -> WorkerState { - while self.index >= self.manager.resync.persisted.load().n_workers { + while self.index >= self.persister.get_with(|x| x.n_workers) { self.manager.resync.notify.notified().await } diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index c479d9d1..e3a65857 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_db" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -19,18 +19,18 @@ required-features = ["cli"] [dependencies] err-derive = "0.3" hexdump = "0.1" -tracing = "0.1.30" +tracing = "0.1" heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true } -rusqlite = { version = "0.27", optional = true } +rusqlite = { version = "0.28", optional = true } sled = { version = "0.34", optional = true } # cli deps -clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } +clap = { version = "4.1", optional = true, features = ["derive", "env"] } pretty_env_logger = { version = "0.4", optional = true } [dev-dependencies] -mktemp = "0.4" +mktemp = "0.5" [features] default = [ "sled" ] diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index b43b0242..0cbdf890 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -21,22 +21,22 @@ path = "tests/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } -garage_api = { version = "0.8.1", path = "../api" } -garage_block = { version = "0.8.1", path = "../block" } -garage_model = { version = "0.8.1", path = "../model" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_table = { version = "0.8.1", path = "../table" } -garage_util = { version = "0.8.1", path = "../util" } -garage_web = { version = "0.8.1", path = "../web" } +garage_db = { version = "0.8.2", path = "../db" } +garage_api = { version = "0.8.2", path = "../api" } +garage_block = { version = "0.8.2", path = "../block" } +garage_model = { version = "0.8.2", path = "../model" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_table = { version = "0.8.2", path = "../table" } +garage_util = { version = "0.8.2", path = "../util" } +garage_web = { version = "0.8.2", path = "../web" } backtrace = "0.3" bytes = "1.0" bytesize = "1.1" -timeago = "0.3" +timeago = "0.4" parse_duration = "2.1" hex = "0.4" -tracing = { version = "0.1.30" } +tracing = { version = "0.1" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } rand = "0.8" async-trait = "0.1.7" @@ -45,7 +45,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" structopt = { version = "0.3", default-features = false } -toml = "0.5" +toml = "0.6" futures = "0.3" futures-util = "0.3" @@ -69,7 +69,7 @@ sha2 = "0.10" static_init = "1.0" assert-json-diff = "2.0" serde_json = "1.0" -base64 = "0.13" +base64 = "0.21" [features] diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 58d645ac..e4e50520 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -15,10 +15,10 @@ use garage_util::time::*; use garage_table::replication::*; use garage_table::*; +use garage_rpc::ring::PARTITION_BITS; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; @@ -60,6 +60,7 @@ pub enum AdminRpc { HashMap<usize, garage_util::background::WorkerInfo>, WorkerListOpt, ), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec<BlockResyncErrorInfo>), BlockInfo { @@ -783,6 +784,7 @@ impl AdminRpcHandler { for node in ring.layout.node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; + opt.skip_global = true; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); @@ -799,6 +801,15 @@ impl AdminRpcHandler { Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), } } + + writeln!(&mut ret, "\n======================").unwrap(); + write!( + &mut ret, + "Cluster statistics:\n\n{}", + self.gather_cluster_stats() + ) + .unwrap(); + Ok(AdminRpc::Ok(ret)) } else { Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) @@ -809,32 +820,17 @@ impl AdminRpcHandler { let mut ret = String::new(); writeln!( &mut ret, - "\nGarage version: {} [features: {}]", + "\nGarage version: {} [features: {}]\nRust compiler version: {}", garage_util::version::garage_version(), garage_util::version::garage_features() .map(|list| list.join(", ")) .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), ) .unwrap(); writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - // Gather ring statistics - let ring = self.garage.system.ring.borrow().clone(); - let mut ring_nodes = HashMap::new(); - for (_i, loc) in ring.partitions().iter() { - for n in ring.get_nodes(loc, ring.replication_factor).iter() { - if !ring_nodes.contains_key(n) { - ring_nodes.insert(*n, 0usize); - } - *ring_nodes.get_mut(n).unwrap() += 1; - } - } - writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); - for (n, c) in ring_nodes.iter() { - writeln!(&mut ret, " {:?} {}", n, c).unwrap(); - } - // Gather table statistics let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); @@ -881,12 +877,110 @@ impl AdminRpcHandler { .unwrap(); if !opt.detailed { - writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap(); + writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap(); + } + + if !opt.skip_global { + write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); } Ok(ret) } + fn gather_cluster_stats(&self) -> String { + let mut ret = String::new(); + + // Gather storage node and free space statistics + let layout = &self.garage.system.ring.borrow().layout; + let mut node_partition_count = HashMap::<Uuid, u64>::new(); + for short_id in layout.ring_assignment_data.iter() { + let id = layout.node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = self + .garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<_, _>>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + ret + } + fn gather_table_stats<F, R>( &self, t: &Arc<Table<F, R>>, @@ -943,32 +1037,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Set { opt } => match opt { - WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); - self.garage - .block_manager - .send_scrub_command(scrub_command) - .await?; - Ok(AdminRpc::Ok("Scrub tranquility updated".into())) - } - WorkerSetCmd::ResyncWorkerCount { worker_count } => { - self.garage - .block_manager - .resync - .set_n_workers(*worker_count) - .await?; - Ok(AdminRpc::Ok("Number of resync workers updated".into())) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option<String>, + ) -> Result<AdminRpc, Error> { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - WorkerSetCmd::ResyncTranquility { tranquility } => { - self.garage - .block_manager - .resync - .set_tranquility(*tranquility) - .await?; - Ok(AdminRpc::Ok("Resync tranquility updated".into())) + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result<AdminRpc, Error> { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - }, + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 0d180ecd..905b14d3 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()]; + let mut healthy_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { match layout.roles.get(&adv.id) { Some(NodeRoleV(Some(cfg))) => { + let data_avail = match &adv.status.data_disk_avail { + _ if cfg.capacity.is_none() => "N/A".into(), + Some((avail, total)) => { + let pct = (*avail as f64) / (*total as f64) * 100.; + let avail = bytesize::ByteSize::b(*avail); + format!("{} ({:.1}%)", avail, pct) + } + None => "?".into(), + }; healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}", + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", id = adv.id, host = adv.status.hostname, addr = adv.addr, tags = cfg.tags.join(","), zone = cfg.zone, capacity = cfg.capacity_string(), + data_avail = data_avail, )); } _ => { @@ -191,6 +202,9 @@ pub async fn cmd_admin( AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } + AdminRpc::WorkerVars(wv) => { + print_worker_vars(wv); + } AdminRpc::WorkerInfo(tid, wi) => { print_worker_info(tid, wi); } diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs index 511b53a6..20813f1c 100644 --- a/src/garage/cli/init.rs +++ b/src/garage/cli/init.rs @@ -14,11 +14,11 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?; let idstr = if let Some(addr) = config.rpc_public_addr { - let idstr = format!("{}@{}", hex::encode(&node_id), addr); + let idstr = format!("{}@{}", hex::encode(node_id), addr); println!("{}", idstr); idstr } else { - let idstr = hex::encode(&node_id); + let idstr = hex::encode(node_id); println!("{}", idstr); if !quiet { diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index dcb9fef9..986592ae 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -515,6 +515,11 @@ pub struct StatsOpt { /// Gather detailed statistics (this can be long) #[structopt(short = "d", long = "detailed")] pub detailed: bool, + + /// Don't show global cluster stats (internal use in RPC) + #[structopt(skip)] + #[serde(default)] + pub skip_global: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] @@ -528,11 +533,25 @@ pub enum WorkerOperation { /// Get detailed information about a worker #[structopt(name = "info", version = garage_version())] Info { tid: usize }, + /// Get worker parameter + #[structopt(name = "get", version = garage_version())] + Get { + /// Gather variable values from all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable name to get, or none to get all variables + variable: Option<String>, + }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] Set { - #[structopt(subcommand)] - opt: WorkerSetCmd, + /// Set variable values on all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable node to set + variable: String, + /// Value to set the variable to + value: String, }, } @@ -547,19 +566,6 @@ pub struct WorkerListOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerSetCmd { - /// Set tranquility of scrub operations - #[structopt(name = "scrub-tranquility", version = garage_version())] - ScrubTranquility { tranquility: u32 }, - /// Set number of concurrent block resync workers - #[structopt(name = "resync-worker-count", version = garage_version())] - ResyncWorkerCount { worker_count: usize }, - /// Set tranquility of block resync operations - #[structopt(name = "resync-tranquility", version = garage_version())] - ResyncTranquility { tranquility: u32 }, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 63fd9eba..2c6be2f4 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -229,7 +229,7 @@ pub fn find_matching_node( ) -> Result<Uuid, Error> { let mut candidates = vec![]; for c in cand { - if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) { + if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) { candidates.push(c); } } @@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } +pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { + let table = wv + .into_iter() + .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) + .collect::<Vec<_>>(); + format_table(table); +} + pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/main.rs b/src/garage/main.rs index cd1d6228..9b069d82 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -28,6 +28,7 @@ use structopt::StructOpt; use netapp::util::parse_and_resolve_peer_addr; use netapp::NetworkKey; +use garage_util::config::Config; use garage_util::error::*; use garage_rpc::system::*; @@ -49,11 +50,10 @@ struct Opt { #[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")] pub rpc_host: Option<String>, - /// RPC secret network key for admin operations - #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] - pub rpc_secret: Option<String>, + #[structopt(flatten)] + pub secrets: Secrets, - /// Configuration file (garage.toml) + /// Path to configuration file #[structopt( short = "c", long = "config", @@ -66,6 +66,24 @@ struct Opt { cmd: Command, } +#[derive(StructOpt, Debug)] +pub struct Secrets { + /// RPC secret network key, used to replace rpc_secret in config.toml when running the + /// daemon or doing admin operations + #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] + pub rpc_secret: Option<String>, + + /// Metrics API authentication token, replaces admin.metrics_token in config.toml when + /// running the Garage daemon + #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")] + pub admin_token: Option<String>, + + /// Metrics API authentication token, replaces admin.metrics_token in config.toml when + /// running the Garage daemon + #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")] + pub metrics_token: Option<String>, +} + #[tokio::main] async fn main() { // Initialize version and features info @@ -148,9 +166,9 @@ async fn main() { sodiumoxide::init().expect("Unable to init sodiumoxide"); let res = match opt.cmd { - Command::Server => server::run_server(opt.config_file).await, + Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, repair_opt).await + repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) @@ -165,7 +183,7 @@ async fn main() { } async fn cli_command(opt: Opt) -> Result<(), Error> { - let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() { + let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() { Some(garage_util::config::read_config(opt.config_file.clone()) .err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?) } else { @@ -174,9 +192,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Find and parse network RPC secret let net_key_hex_str = opt + .secrets .rpc_secret .as_ref() - .or_else(|| config.as_ref().map(|c| &c.rpc_secret)) + .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref())) .ok_or("No RPC secret provided")?; let network_key = NetworkKey::from_slice( &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], @@ -233,3 +252,16 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { Ok(x) => Ok(x), } } + +fn fill_secrets(mut config: Config, secrets: Secrets) -> Config { + if secrets.rpc_secret.is_some() { + config.rpc_secret = secrets.rpc_secret; + } + if secrets.admin_token.is_some() { + config.admin.admin_token = secrets.admin_token; + } + if secrets.metrics_token.is_some() { + config.admin.metrics_token = secrets.metrics_token; + } + config +} diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 25193e4a..f4edcf03 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -6,8 +6,13 @@ use garage_util::error::*; use garage_model::garage::Garage; use crate::cli::structs::*; +use crate::{fill_secrets, Secrets}; -pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { +pub async fn offline_repair( + config_file: PathBuf, + secrets: Secrets, + opt: OfflineRepairOpt, +) -> Result<(), Error> { if !opt.yes { return Err(Error::Message( "Please add the --yes flag to launch repair operation".into(), @@ -15,7 +20,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu } info!("Loading configuration..."); - let config = read_config(config_file)?; + let config = fill_secrets(read_config(config_file)?, secrets); info!("Initializing Garage main data store..."); let garage = Garage::new(config)?; diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 627e3bf3..0e14ed51 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -51,7 +51,11 @@ pub async fn launch_online_repair( ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::SetTranquility { tranquility } => { - ScrubWorkerCommand::SetTranquility(tranquility) + garage + .block_manager + .scrub_persister + .set_with(|x| x.tranquility = tranquility)?; + return Ok(()); } }; info!("Sending command to scrub worker: {:?}", cmd); diff --git a/src/garage/server.rs b/src/garage/server.rs index 16f1b625..958089c6 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -17,6 +17,7 @@ use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; +use crate::{fill_secrets, Secrets}; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -26,9 +27,9 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { } } -pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { +pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file)?; + let config = fill_secrets(read_config(config_file)?, secrets); // ---- Initialize Garage internals ---- diff --git a/src/garage/tests/admin.rs b/src/garage/tests/admin.rs index 37aefe38..d3becf0a 100644 --- a/src/garage/tests/admin.rs +++ b/src/garage/tests/admin.rs @@ -21,14 +21,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "allow", - "--read", - "--key", - &ctx.garage.key.id, - BCKT_NAME, - ]) + .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); @@ -36,14 +29,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "deny", - "--read", - "--key", - &ctx.garage.key.name, - BCKT_NAME, - ]) + .args(["bucket", "deny", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); @@ -51,14 +37,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "allow", - "--read", - "--key", - &ctx.garage.key.name, - BCKT_NAME, - ]) + .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs index 9c363013..0dec3cfa 100644 --- a/src/garage/tests/bucket.rs +++ b/src/garage/tests/bucket.rs @@ -13,7 +13,7 @@ async fn test_bucket_all() { ctx.garage .command() .args(["key", "deny"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not deny key to create buckets"); @@ -26,7 +26,7 @@ async fn test_bucket_all() { ctx.garage .command() .args(["key", "allow"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not deny key to create buckets"); diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs index 212588b5..e9d4849a 100644 --- a/src/garage/tests/common/client.rs +++ b/src/garage/tests/common/client.rs @@ -1,15 +1,9 @@ use aws_sdk_s3::{Client, Config, Credentials, Endpoint}; -use super::garage::Instance; +use super::garage::{Instance, Key}; -pub fn build_client(instance: &Instance) -> Client { - let credentials = Credentials::new( - &instance.key.id, - &instance.key.secret, - None, - None, - "garage-integ-test", - ); +pub fn build_client(instance: &Instance, key: &Key) -> Client { + let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test"); let endpoint = Endpoint::immutable(instance.s3_uri()); let config = Config::builder() diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 1700cc90..609eda97 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -14,6 +14,7 @@ use garage_api::signature; /// You should ever only use this to send requests AWS sdk won't send, /// like to reproduce behavior of unusual implementations found to be /// problematic. +#[derive(Clone)] pub struct CustomRequester { key: Key, uri: Uri, @@ -22,18 +23,18 @@ pub struct CustomRequester { } impl CustomRequester { - pub fn new_s3(instance: &Instance) -> Self { + pub fn new_s3(instance: &Instance, key: &Key) -> Self { CustomRequester { - key: instance.key.clone(), + key: key.clone(), uri: instance.s3_uri(), service: "s3", client: Client::new(), } } - pub fn new_k2v(instance: &Instance) -> Self { + pub fn new_k2v(instance: &Instance, key: &Key) -> Self { CustomRequester { - key: instance.key.clone(), + key: key.clone(), uri: instance.k2v_uri(), service: "k2v", client: Client::new(), diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index dbebe5b1..3beed7c4 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -13,7 +13,7 @@ static GARAGE_TEST_SECRET: &str = #[derive(Debug, Default, Clone)] pub struct Key { - pub name: String, + pub name: Option<String>, pub id: String, pub secret: String, } @@ -21,10 +21,11 @@ pub struct Key { pub struct Instance { process: process::Child, pub path: PathBuf, - pub key: Key, + pub default_key: Key, pub s3_port: u16, pub k2v_port: u16, pub web_port: u16, + pub admin_port: u16, } impl Instance { @@ -101,22 +102,36 @@ api_bind_addr = "127.0.0.1:{admin_port}" Instance { process: child, path, - key: Key::default(), + default_key: Key::default(), s3_port: port, k2v_port: port + 1, web_port: port + 3, + admin_port: port + 4, } } fn setup(&mut self) { - use std::{thread, time::Duration}; - - // Wait for node to be ready - thread::sleep(Duration::from_secs(2)); - + self.wait_for_boot(); self.setup_layout(); + self.default_key = self.key(Some("garage_test")); + } + + fn wait_for_boot(&mut self) { + use std::{thread, time::Duration}; - self.key = self.new_key("garage_test"); + // 60 * 2 seconds = 120 seconds = 2min + for _ in 0..60 { + let termination = self + .command() + .args(["status"]) + .quiet() + .status() + .expect("Unable to run command"); + if termination.success() { + break; + } + thread::sleep(Duration::from_secs(2)); + } } fn setup_layout(&self) { @@ -167,13 +182,17 @@ api_bind_addr = "127.0.0.1:{admin_port}" .expect("Could not build garage endpoint URI") } - pub fn new_key(&self, name: &str) -> Key { + pub fn key(&self, maybe_name: Option<&str>) -> Key { let mut key = Key::default(); - let output = self - .command() - .args(["key", "create", name]) - .expect_success_output("Could not create key"); + let mut cmd = self.command(); + let base = cmd.args(["key", "create"]); + let with_name = match maybe_name { + Some(name) => base.args([name]), + None => base, + }; + + let output = with_name.expect_success_output("Could not create key"); let stdout = String::from_utf8(output.stdout).unwrap(); for line in stdout.lines() { @@ -190,7 +209,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty"); Key { - name: name.to_owned(), + name: maybe_name.map(String::from), ..key } } diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs index 28874b02..eca3e42b 100644 --- a/src/garage/tests/common/mod.rs +++ b/src/garage/tests/common/mod.rs @@ -13,13 +13,16 @@ use custom_requester::CustomRequester; const REGION: Region = Region::from_static("garage-integ-test"); +#[derive(Clone)] pub struct Context { pub garage: &'static garage::Instance, + pub key: garage::Key, pub client: Client, pub custom_request: CustomRequester, pub k2v: K2VContext, } +#[derive(Clone)] pub struct K2VContext { pub request: CustomRequester, } @@ -27,13 +30,15 @@ pub struct K2VContext { impl Context { fn new() -> Self { let garage = garage::instance(); - let client = client::build_client(garage); - let custom_request = CustomRequester::new_s3(garage); - let k2v_request = CustomRequester::new_k2v(garage); + let key = garage.key(None); + let client = client::build_client(garage, &key); + let custom_request = CustomRequester::new_s3(garage, &key); + let k2v_request = CustomRequester::new_k2v(garage, &key); Context { garage, client, + key, custom_request, k2v: K2VContext { request: k2v_request, @@ -57,7 +62,7 @@ impl Context { .args(["bucket", "allow"]) .args(["--owner", "--read", "--write"]) .arg(&bucket_name) - .args(["--key", &self.garage.key.name]) + .args(["--key", &self.key.id]) .quiet() .expect_success_status("Could not allow key for bucket"); diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 6abba1c5..595d0ba8 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -36,12 +37,12 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}, {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}} ]"#, - base64::encode(values.get(&"a").unwrap()), - base64::encode(values.get(&"b").unwrap()), - base64::encode(values.get(&"c").unwrap()), - base64::encode(values.get(&"d.1").unwrap()), - base64::encode(values.get(&"d.2").unwrap()), - base64::encode(values.get(&"e").unwrap()), + BASE64_STANDARD.encode(values.get(&"a").unwrap()), + BASE64_STANDARD.encode(values.get(&"b").unwrap()), + BASE64_STANDARD.encode(values.get(&"c").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2").unwrap()), + BASE64_STANDARD.encode(values.get(&"e").unwrap()), ) .into_bytes(), ) @@ -120,12 +121,12 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -141,10 +142,10 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -160,9 +161,9 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, ], "more": false, "nextStart": null, @@ -178,8 +179,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -195,8 +196,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -212,7 +213,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]} ], "more": true, "nextStart": "b", @@ -228,8 +229,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]} + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]} ], "more": false, "nextStart": null, @@ -255,10 +256,10 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}} ]"#, ct.get(&"b").unwrap(), - base64::encode(values.get(&"c'").unwrap()), + BASE64_STANDARD.encode(values.get(&"c'").unwrap()), ct.get(&"d.1").unwrap(), - base64::encode(values.get(&"d.1'").unwrap()), - base64::encode(values.get(&"d.2'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()), ) .into_bytes(), ) @@ -333,11 +334,11 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -353,8 +354,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -370,7 +371,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -386,7 +387,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": true, "nextStart": "d.2", @@ -402,7 +403,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -418,8 +419,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -435,8 +436,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -452,8 +453,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -563,8 +564,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -580,8 +581,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, ], "more": false, "nextStart": null, @@ -599,10 +600,10 @@ async fn test_batch() { "items": [ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]}, {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]}, {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, ], "more": false, "nextStart": null, diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 2641386f..588836c7 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -3,6 +3,7 @@ use std::time::Duration; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -222,7 +223,10 @@ async fn test_items_and_indices() { let res_json = json_body(res).await; assert_json_eq!( res_json, - [base64::encode(&content2), base64::encode(&content3)] + [ + BASE64_STANDARD.encode(&content2), + BASE64_STANDARD.encode(&content3) + ] ); // ReadIndex -- now there should be some stuff @@ -411,7 +415,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // f2: binary let res = ctx @@ -452,7 +456,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // -- Test with a second, concurrent value -- let res = ctx @@ -488,8 +492,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -512,8 +516,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -550,8 +554,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -587,7 +591,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f1: not specified let res = ctx @@ -612,7 +619,10 @@ async fn test_item_return_format() { .unwrap() .to_string(); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f2: binary let res = ctx @@ -644,7 +654,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // -- Delete everything -- let res = ctx diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index e56705ae..dd44aed9 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,12 +1,17 @@ +use base64::prelude::*; use hyper::{Method, StatusCode}; use std::time::Duration; +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; use crate::common; #[tokio::test] -async fn test_poll() { +async fn test_poll_item() { let ctx = common::context(); - let bucket = ctx.create_bucket("test-k2v-poll"); + let bucket = ctx.create_bucket("test-k2v-poll-item"); // Write initial value let res = ctx @@ -52,8 +57,8 @@ async fn test_poll() { let poll = { let bucket = bucket.clone(); let ct = ct.clone(); + let ctx = ctx.clone(); tokio::spawn(async move { - let ctx = common::context(); ctx.k2v .request .builder(bucket.clone()) @@ -96,3 +101,165 @@ async fn test_poll() { .to_vec(); assert_eq!(poll_res_body, b"New value"); } + +#[tokio::test] +async fn test_poll_range() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll-range"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Initial poll range, retrieve single item and first seen_marker + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(b"{}".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let json_res = json_body(res2).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_json_eq!( + json_res, + json!( + { + "items": [ + {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]}, + ], + "seenMarker": seen_marker, + } + ) + ); + + // Second poll range, which will complete later + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test1")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"New value")]) + ); + + // Now we will add a value on a different key + // Start a new poll operation + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write value on different key + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test2")) + .body(b"Other value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test2")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"Other value")]) + ); +} diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index 48da7607..b7a1acae 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -109,7 +109,7 @@ async fn test_create_bucket_streaming() { ctx.garage .command() .args(["key", "allow"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not allow key to create buckets"); diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs index 244a2fa0..f61838e4 100644 --- a/src/garage/tests/s3/website.rs +++ b/src/garage/tests/s3/website.rs @@ -1,5 +1,8 @@ use crate::common; use crate::common::ext::*; +use crate::k2v::json_body; + +use assert_json_diff::assert_json_eq; use aws_sdk_s3::{ model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration}, types::ByteStream, @@ -9,6 +12,7 @@ use hyper::{ body::{to_bytes, Body}, Client, }; +use serde_json::json; const BODY: &[u8; 16] = b"<h1>bonjour</h1>"; const BODY_ERR: &[u8; 6] = b"erreur"; @@ -49,6 +53,31 @@ async fn test_website() { BODY.as_ref() ); /* check that we do not leak body */ + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: Bucket 'my-website' is not authorized for website hosting", + "region": "garage-integ-test", + "path": "/check", + }) + ); + ctx.garage .command() .args(["bucket", "website", "--allow", BCKT_NAME]) @@ -62,6 +91,25 @@ async fn test_website() { BODY.as_ref() ); + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let mut admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::OK); + assert_eq!( + to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(), + format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes() + ); + ctx.garage .command() .args(["bucket", "website", "--deny", BCKT_NAME]) @@ -74,6 +122,31 @@ async fn test_website() { to_bytes(resp.body_mut()).await.unwrap().as_ref(), BODY.as_ref() ); /* check that we do not leak body */ + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: Bucket 'my-website' is not authorized for website hosting", + "region": "garage-integ-test", + "path": "/check", + }) + ); } #[tokio::test] @@ -322,3 +395,103 @@ async fn test_website_s3_api() { ); } } + +#[tokio::test] +async fn test_website_check_website_enabled() { + let ctx = common::context(); + + let client = Client::new(); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port)) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: No domain query string found", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: ", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=foobar", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: foobar", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=%E2%98%B9", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: ☹", + "region": "garage-integ-test", + "path": "/check", + }) + ); +} diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml index f57ce849..52c16d89 100644 --- a/src/k2v-client/Cargo.toml +++ b/src/k2v-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k2v-client" -version = "0.0.1" +version = "0.1.1" authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -9,20 +9,21 @@ repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" readme = "../../README.md" [dependencies] -base64 = "0.13.0" -http = "0.2.6" +base64 = "0.21" +http = "0.2" log = "0.4" rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] } rusoto_credential = "0.48.0" rusoto_signature = "0.48.0" -serde = "1.0.137" -serde_json = "1.0.81" -thiserror = "1.0.31" -tokio = "1.17.0" +hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] } +serde = "1.0" +serde_json = "1.0" +thiserror = "1.0" +tokio = "1.24" # cli deps -clap = { version = "3.1.18", optional = true, features = ["derive", "env"] } -garage_util = { version = "0.8.1", path = "../util", optional = true } +clap = { version = "4.1", optional = true, features = ["derive", "env"] } +garage_util = { version = "0.8.2", path = "../util", optional = true } [features] diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs index 925ebeb8..cdd63cce 100644 --- a/src/k2v-client/bin/k2v-cli.rs +++ b/src/k2v-client/bin/k2v-cli.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; +use std::process::exit; use std::time::Duration; use k2v_client::*; @@ -57,22 +59,39 @@ enum Command { #[clap(flatten)] output_kind: ReadOutputKind, }, - /// Watch changes on a single value - Poll { - /// Partition key to delete from + /// Watch changes on a single value + PollItem { + /// Partition key of item to watch partition_key: String, - /// Sort key to delete from + /// Sort key of item to watch sort_key: String, /// Causality information #[clap(short, long)] causality: String, /// Timeout, in seconds - #[clap(short, long)] + #[clap(short = 'T', long)] timeout: Option<u64>, /// Output formating #[clap(flatten)] output_kind: ReadOutputKind, }, + /// Watch changes on a range of values + PollRange { + /// Partition key to poll from + partition_key: String, + /// Output only sort keys matching this filter + #[clap(flatten)] + filter: Filter, + /// Marker of data that had previously been seen by a PollRange + #[clap(short = 'S', long)] + seen_marker: Option<String>, + /// Timeout, in seconds + #[clap(short = 'T', long)] + timeout: Option<u64>, + /// Output formating + #[clap(flatten)] + output_kind: BatchOutputKind, + }, /// Delete a single value Delete { /// Partition key to delete from @@ -176,7 +195,6 @@ struct ReadOutputKind { impl ReadOutputKind { fn display_output(&self, val: CausalValue) -> ! { use std::io::Write; - use std::process::exit; if self.json { let stdout = std::io::stdout(); @@ -254,6 +272,83 @@ struct BatchOutputKind { json: bool, } +impl BatchOutputKind { + fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! { + for (key, values) in values { + println!("key: {}", key); + let causality: String = values.causality.into(); + println!("causality: {}", causality); + for value in values.value { + match value { + K2vValue::Value(v) => { + if let Ok(string) = std::str::from_utf8(&v) { + println!(" value(utf-8): {}", string); + } else { + println!(" value(base64): {}", base64::encode(&v)); + } + } + K2vValue::Tombstone => { + println!(" tombstone"); + } + } + } + } + exit(0); + } + + fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> { + values + .into_iter() + .map(|(k, v)| { + let mut value = serde_json::to_value(v).unwrap(); + value + .as_object_mut() + .unwrap() + .insert("sort_key".to_owned(), k.into()); + value + }) + .collect::<Vec<_>>() + } + + fn display_poll_range_output( + &self, + seen_marker: String, + values: BTreeMap<String, CausalValue>, + ) -> ! { + if self.json { + let json = serde_json::json!({ + "values": self.values_json(values), + "seen_marker": seen_marker, + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + println!("seen marker: {}", seen_marker); + self.display_human_output(values) + } + } + + fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! { + if self.json { + let json = serde_json::json!({ + "next_key": res.next_start, + "values": self.values_json(res.items), + }); + + let stdout = std::io::stdout(); + serde_json::to_writer_pretty(stdout, &json).unwrap(); + exit(0) + } else { + if let Some(next) = res.next_start { + println!("next key: {}", next); + } + self.display_human_output(res.items) + } + } +} + /// Filter for batch operations #[derive(Parser, Debug)] #[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))] @@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> { let res = client.read_item(&partition_key, &sort_key).await?; output_kind.display_output(res); } - Command::Poll { + Command::PollItem { partition_key, sort_key, causality, @@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> { if let Some(res) = res_opt { output_kind.display_output(res); } else { - println!("Delay expired and value didn't change."); + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } + } + Command::PollRange { + partition_key, + filter, + seen_marker, + timeout, + output_kind, + } => { + if filter.conflicts_only + || filter.tombstones + || filter.reverse + || filter.limit.is_some() + { + return Err(Error::Message( + "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(), + )); + } + + let timeout = timeout.map(Duration::from_secs); + let res = client + .poll_range( + &partition_key, + Some(PollRangeFilter { + start: filter.start.as_deref(), + end: filter.end.as_deref(), + prefix: filter.prefix.as_deref(), + }), + seen_marker.as_deref(), + timeout, + ) + .await?; + match res { + Some((items, seen_marker)) => { + output_kind.display_poll_range_output(seen_marker, items); + } + None => { + if output_kind.json { + println!("null"); + } else { + println!("Delay expired and value didn't change."); + } + } } } Command::ReadIndex { @@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> { }; let mut res = client.read_batch(&[op]).await?; let res = res.pop().unwrap(); - if output_kind.json { - let values = res - .items - .into_iter() - .map(|(k, v)| { - let mut value = serde_json::to_value(v).unwrap(); - value - .as_object_mut() - .unwrap() - .insert("sort_key".to_owned(), k.into()); - value - }) - .collect::<Vec<_>>(); - let json = serde_json::json!({ - "next_key": res.next_start, - "values": values, - }); - - let stdout = std::io::stdout(); - serde_json::to_writer_pretty(stdout, &json).unwrap(); - } else { - if let Some(next) = res.next_start { - println!("next key: {}", next); - } - for (key, values) in res.items { - println!("key: {}", key); - let causality: String = values.causality.into(); - println!("causality: {}", causality); - for value in values.value { - match value { - K2vValue::Value(v) => { - if let Ok(string) = std::str::from_utf8(&v) { - println!(" value(utf-8): {}", string); - } else { - println!(" value(base64): {}", base64::encode(&v)); - } - } - K2vValue::Tombstone => { - println!(" tombstone"); - } - } - } - } - } + output_kind.display_read_range_output(res); } Command::DeleteRange { partition_key, diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs index c2606af4..ca52d0cf 100644 --- a/src/k2v-client/lib.rs +++ b/src/k2v-client/lib.rs @@ -40,7 +40,13 @@ impl K2vClient { creds: AwsCredentials, user_agent: Option<String>, ) -> Result<Self, Error> { - let mut client = HttpClient::new()?; + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let mut client = HttpClient::from_connector(connector); if let Some(ua) = user_agent { client.local_agent_prepend(ua); } else { @@ -153,6 +159,58 @@ impl K2vClient { } } + /// Perform a PollRange request, waiting for any change in a given range of keys + /// to occur + pub async fn poll_range( + &self, + partition_key: &str, + filter: Option<PollRangeFilter<'_>>, + seen_marker: Option<&str>, + timeout: Option<Duration>, + ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> { + let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT); + + let request = PollRangeRequest { + filter: filter.unwrap_or_default(), + seen_marker, + timeout: timeout.as_secs(), + }; + + let mut req = SignedRequest::new( + "POST", + SERVICE, + &self.region, + &format!("/{}/{}", self.bucket, partition_key), + ); + req.add_param("poll_range", ""); + + let payload = serde_json::to_vec(&request)?; + req.set_payload(Some(payload)); + let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?; + + if res.status == StatusCode::NOT_MODIFIED { + return Ok(None); + } + + let resp: PollRangeResponse = serde_json::from_slice(&res.body)?; + + let items = resp + .items + .into_iter() + .map(|BatchReadItem { sk, ct, v }| { + ( + sk, + CausalValue { + causality: ct, + value: v, + }, + ) + }) + .collect::<BTreeMap<_, _>>(); + + Ok(Some((items, resp.seen_marker))) + } + /// Perform an InsertItem request, inserting a value for a single pk+sk. pub async fn insert_item( &self, @@ -389,6 +447,12 @@ impl From<CausalityToken> for String { } } +impl AsRef<str> for CausalityToken { + fn as_ref(&self) -> &str { + &self.0 + } +} + /// A value in K2V. can be either a binary value, or a tombstone. #[derive(Debug, Clone, PartialEq, Eq)] pub enum K2vValue { @@ -466,6 +530,29 @@ pub struct Filter<'a> { pub reverse: bool, } +#[derive(Debug, Default, Clone, Serialize)] +pub struct PollRangeFilter<'a> { + pub start: Option<&'a str>, + pub end: Option<&'a str>, + pub prefix: Option<&'a str>, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeRequest<'a> { + #[serde(flatten)] + filter: PollRangeFilter<'a>, + seen_marker: Option<&'a str>, + timeout: u64, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PollRangeResponse { + items: Vec<BatchReadItem>, + seen_marker: String, +} + impl<'a> Filter<'a> { fn insert_params(&self, req: &mut SignedRequest) { if let Some(start) = &self.start { diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 323c2d64..2b525a42 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,21 +14,21 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", default-features = false, path = "../db" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_table = { version = "0.8.1", path = "../table" } -garage_block = { version = "0.8.1", path = "../block" } -garage_util = { version = "0.8.1", path = "../util" } +garage_db = { version = "0.8.2", default-features = false, path = "../db" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_table = { version = "0.8.2", path = "../table" } +garage_block = { version = "0.8.2", path = "../block" } +garage_util = { version = "0.8.2", path = "../util" } async-trait = "0.1.7" arc-swap = "1.0" -blake2 = "0.9" +blake2 = "0.10" err-derive = "0.3" hex = "0.4" -base64 = "0.13" -tracing = "0.1.30" +base64 = "0.21" +tracing = "0.1" rand = "0.8" -zstd = { version = "0.9", default-features = false } +zstd = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/garage.rs b/src/model/garage.rs index 5bea6b4f..3daa1b33 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -27,12 +27,14 @@ use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::k2v::{item_table::*, poll::*, rpc::*}; +use crate::k2v::{item_table::*, rpc::*, sub::*}; /// An entire Garage full of data pub struct Garage { /// The parsed configuration Garage is running pub config: Config, + /// The set of background variables that can be viewed/modified at runtime + pub bg_vars: vars::BgVars, /// The replication mode of this cluster pub replication_mode: ReplicationMode, @@ -96,7 +98,7 @@ impl Garage { .cache_capacity(config.sled_cache_capacity) .flush_every_ms(Some(config.sled_flush_every_ms)) .open() - .expect("Unable to open sled DB"); + .ok_or_message("Unable to open sled DB")?; db::sled_adapter::SledDb::init(db) } #[cfg(not(feature = "sled"))] @@ -107,7 +109,7 @@ impl Garage { db_path.push("db.sqlite"); info!("Opening Sqlite database at: {}", db_path.display()); let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) - .expect("Unable to open sqlite DB"); + .ok_or_message("Unable to open sqlite DB")?; db::sqlite_adapter::SqliteDb::init(db) } #[cfg(not(feature = "sqlite"))] @@ -121,7 +123,8 @@ impl Garage { "lmdb" | "heed" => { db_path.push("db.lmdb"); info!("Opening LMDB database at: {}", db_path.display()); - std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + std::fs::create_dir_all(&db_path) + .ok_or_message("Unable to create LMDB data directory")?; let map_size = garage_db::lmdb_adapter::recommended_map_size(); use db::lmdb_adapter::heed; @@ -133,7 +136,16 @@ impl Garage { env_builder.flag(heed::flags::Flags::MdbNoSync); env_builder.flag(heed::flags::Flags::MdbNoMetaSync); } - let db = env_builder.open(&db_path).expect("Unable to open LMDB DB"); + let db = match env_builder.open(&db_path) { + Err(heed::Error::Io(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => { + return Err(Error::Message( + "OutOfMemory error while trying to open LMDB database. This can happen \ + if your operating system is not allowing you to use sufficient virtual \ + memory address space. Please check that no limit is set (ulimit -v). \ + On 32-bit machines, you should probably switch to another database engine.".into())) + } + x => x.ok_or_message("Unable to open LMDB DB")?, + }; db::lmdb_adapter::LmdbDb::init(db) } #[cfg(not(feature = "lmdb"))] @@ -156,13 +168,15 @@ impl Garage { } }; - let network_key = NetworkKey::from_slice( - &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], - ) - .expect("Invalid RPC secret key"); + let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message( + "rpc_secret value is missing, not present in config file or in environment", + )?) + .ok() + .and_then(|x| NetworkKey::from_slice(&x)) + .ok_or_message("Invalid RPC secret key")?; let replication_mode = ReplicationMode::parse(&config.replication_mode) - .expect("Invalid replication_mode in config file."); + .ok_or_message("Invalid replication_mode in config file.")?; info!("Initialize membership management system..."); let system = System::new(network_key, replication_mode, &config)?; @@ -249,9 +263,14 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); + // Initialize bg vars + let mut bg_vars = vars::BgVars::new(); + block_manager.register_bg_vars(&mut bg_vars); + // -- done -- Ok(Arc::new(Self { config, + bg_vars, replication_mode, db, system, @@ -298,8 +317,10 @@ impl GarageK2V { fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); let item_table = Table::new( K2VItemTable { @@ -310,7 +331,9 @@ impl GarageK2V { system.clone(), db, ); - let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + info!("Initialize K2V RPC handler..."); + let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions); Self { item_table, diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 9a692870..c80ebd39 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,3 +1,14 @@ +//! Implements a CausalContext, which is a set of timestamps for each +//! node -- a vector clock --, indicating that the versions with +//! timestamps <= these numbers have been seen and can be +//! overwritten by a subsequent write. +//! +//! The textual representation of a CausalContext, which we call a +//! "causality token", is used in the API and must be sent along with +//! each write or delete operation to indicate the previously seen +//! versions that we want to overwrite or delete. +use base64::prelude::*; + use std::collections::BTreeMap; use std::convert::TryInto; @@ -5,28 +16,44 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; + /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; +pub type VectorClock = BTreeMap<K2VNodeId, u64>; + pub fn make_node_id(node_id: Uuid) -> K2VNodeId { let mut tmp = [0u8; 8]; tmp.copy_from_slice(&node_id.as_slice()[..8]); u64::from_be_bytes(tmp) } -#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool { + a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0)) +} + +pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock { + let mut ret = a.clone(); + for (n, ts) in b.iter() { + let ent = ret.entry(*n).or_insert(0); + *ent = std::cmp::max(*ts, *ent); + } + ret +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)] pub struct CausalContext { - pub vector_clock: BTreeMap<K2VNodeId, u64>, + pub vector_clock: VectorClock, } impl CausalContext { /// Empty causality context - pub fn new_empty() -> Self { - Self { - vector_clock: BTreeMap::new(), - } + pub fn new() -> Self { + Self::default() } + /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); @@ -41,14 +68,15 @@ impl CausalContext { bytes.extend(u64::to_be_bytes(i)); } - base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) + BASE64_URL_SAFE_NO_PAD.encode(bytes) } - /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Result<Self, String> { - let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) - .map_err(|e| format!("bad causality token base64: {}", e))?; + + /// Parse from base64-encoded binary representation. + /// Returns None on error. + pub fn parse(s: &str) -> Option<Self> { + let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?; if bytes.len() % 16 != 8 || bytes.len() < 8 { - return Err("bad causality token length".into()); + return None; } let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); @@ -65,16 +93,19 @@ impl CausalContext { let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); if check != checksum { - return Err("bad causality token checksum".into()); + return None; } - Ok(ret) + Some(ret) } + + pub fn parse_helper(s: &str) -> Result<Self, HelperError> { + Self::parse(s).ok_or_bad_request("Invalid causality token") + } + /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { - self.vector_clock - .iter() - .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + vclock_gt(&self.vector_clock, &other.vector_clock) } } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index ce3e4129..9e3ba5a5 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::index_counter::*; use crate::k2v::causality::*; -use crate::k2v::poll::*; +use crate::k2v::sub::*; pub const ENTRIES: &str = "entries"; pub const CONFLICTS: &str = "conflicts"; @@ -73,7 +73,8 @@ impl K2VItem { this_node: Uuid, context: &Option<CausalContext>, new_value: DvvsValue, - ) { + node_ts: u64, + ) -> u64 { if let Some(context) = context { for (node, t_discard) in context.vector_clock.iter() { if let Some(e) = self.items.get_mut(node) { @@ -98,12 +99,14 @@ impl K2VItem { values: vec![], }); let t_prev = e.max_time(); - e.values.push((t_prev + 1, new_value)); + let t_new = std::cmp::max(t_prev + 1, node_ts + 1); + e.values.push((t_new, new_value)); + t_new } /// Extract the causality context of a K2V Item pub fn causal_context(&self) -> CausalContext { - let mut cc = CausalContext::new_empty(); + let mut cc = CausalContext::new(); for (node, ent) in self.items.iter() { cc.vector_clock.insert(*node, ent.max_time()); } @@ -173,9 +176,9 @@ impl Crdt for DvvsEntry { impl PartitionKey for K2VItemPartition { fn hash(&self) -> Hash { - use blake2::{Blake2b, Digest}; + use blake2::{Blake2b512, Digest}; - let mut hasher = Blake2b::new(); + let mut hasher = Blake2b512::new(); hasher.update(self.bucket_id.as_slice()); hasher.update(self.partition_key.as_bytes()); let mut hash = [0u8; 32]; @@ -310,7 +313,7 @@ mod tests { values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], }; - let mut e3 = e1.clone(); + let mut e3 = e1; e3.merge(&e2); assert_eq!(e2, e3); } diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index f6a96151..acc1fcdc 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,6 +1,8 @@ pub mod causality; +pub mod seen; pub mod item_table; -pub mod poll; pub mod rpc; + +pub mod sub; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs deleted file mode 100644 index 93105207..00000000 --- a/src/model/k2v/poll.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::collections::HashMap; -use std::sync::Mutex; - -use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; - -use crate::k2v::item_table::*; - -#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PollKey { - pub partition: K2VItemPartition, - pub sort_key: String, -} - -#[derive(Default)] -pub struct SubscriptionManager { - subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, -} - -impl SubscriptionManager { - pub fn new() -> Self { - Self::default() - } - - pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(key) { - s.subscribe() - } else { - let (tx, rx) = broadcast::channel(8); - subs.insert(key.clone(), tx); - rx - } - } - - pub fn notify(&self, item: &K2VItem) { - let key = PollKey { - partition: item.partition.clone(), - sort_key: item.sort_key.clone(), - }; - let mut subs = self.subscriptions.lock().unwrap(); - if let Some(s) = subs.get(&key) { - if s.send(item.clone()).is_err() { - // no more subscribers, remove channel from here - // (we will re-create it later if we need to subscribe again) - subs.remove(&key); - } - } - } -} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index f64a7984..37e142f6 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -5,9 +5,10 @@ //! node does not process the entry directly, as this would //! mean the vector clock gets much larger than needed). -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; +use std::collections::{BTreeMap, HashMap}; +use std::convert::TryInto; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::stream::FuturesUnordered; @@ -15,9 +16,12 @@ use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::select; +use garage_db as db; + use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::now_msec; use garage_rpc::system::System; use garage_rpc::*; @@ -25,9 +29,15 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; +use crate::helper::error::Error as HelperError; use crate::k2v::causality::*; use crate::k2v::item_table::*; -use crate::k2v::poll::*; +use crate::k2v::seen::*; +use crate::k2v::sub::*; + +const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200); + +const TIMESTAMP_KEY: &[u8] = b"timestamp"; /// RPC messages for K2V #[derive(Debug, Serialize, Deserialize)] @@ -40,7 +50,13 @@ enum K2VRpc { causal_context: CausalContext, timeout_msec: u64, }, + PollRange { + range: PollRange, + seen_str: Option<String>, + timeout_msec: u64, + }, PollItemResponse(Option<K2VItem>), + PollRangeResponse(Uuid, Vec<K2VItem>), } #[derive(Debug, Serialize, Deserialize)] @@ -59,6 +75,12 @@ impl Rpc for K2VRpc { pub struct K2VRpcHandler { system: Arc<System>, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + + // Using a mutex on the local_timestamp_tree is not strictly necessary, + // but it helps to not try to do several inserts at the same time, + // which would create transaction conflicts and force many useless retries. + local_timestamp_tree: Mutex<db::Tree>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, subscriptions: Arc<SubscriptionManager>, } @@ -66,14 +88,19 @@ pub struct K2VRpcHandler { impl K2VRpcHandler { pub fn new( system: Arc<System>, + db: &db::Db, item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, subscriptions: Arc<SubscriptionManager>, ) -> Arc<Self> { + let local_timestamp_tree = db + .open_tree("k2v_local_timestamp") + .expect("Unable to open DB tree for k2v local timestamp"); let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); let rpc_handler = Arc::new(Self { system, item_table, + local_timestamp_tree: Mutex::new(local_timestamp_tree), endpoint, subscriptions, }); @@ -181,7 +208,7 @@ impl K2VRpcHandler { Ok(()) } - pub async fn poll( + pub async fn poll_item( &self, bucket_id: Uuid, partition_key: String, @@ -230,9 +257,7 @@ impl K2VRpcHandler { resp = Some(x); } } - K2VRpc::PollItemResponse(None) => { - return Ok(None); - } + K2VRpc::PollItemResponse(None) => (), v => return Err(Error::unexpected_rpc_message(v)), } } @@ -240,10 +265,117 @@ impl K2VRpcHandler { Ok(resp) } + pub async fn poll_range( + &self, + range: PollRange, + seen_str: Option<String>, + timeout_msec: u64, + ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> { + let has_seen_marker = seen_str.is_some(); + + // Parse seen marker, we will use it below. This is also the first check + // that it is valid, which returns a bad request error if not. + let mut seen = seen_str + .as_deref() + .map(RangeSeenMarker::decode_helper) + .transpose()? + .unwrap_or_default(); + seen.restrict(&range); + + // Prepare PollRange RPC to send to the storage nodes responsible for the parititon + let nodes = self + .item_table + .data + .replication + .write_nodes(&range.partition.hash()); + let quorum = self.item_table.data.replication.read_quorum(); + let msg = K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + }; + + // Send the request to all nodes, use FuturesUnordered to get the responses in any order + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); + let mut requests = nodes + .iter() + .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .collect::<FuturesUnordered<_>>(); + + // Fetch responses. This procedure stops fetching responses when any of the following + // conditions arise: + // - we have a response to all requests + // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay + // has passed since the quorum was achieved + // - a global RPC timeout expired + // The extra delay after a quorum was received is usefull if the third response was to + // arrive during this short interval: this would allow us to consider all the data seen + // by that last node in the response we produce, and would likely help reduce the + // size of the seen marker that we will return (because we would have an info of the + // kind: all items produced by that node until time ts have been returned, so we can + // bump the entry in the global vector clock and possibly remove some item-specific + // vector clocks) + let mut deadline = + Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut resps = vec![]; + let mut errors = vec![]; + loop { + select! { + _ = tokio::time::sleep_until(deadline.into()) => { + break; + } + res = requests.next() => match res { + None => break, + Some(Err(e)) => errors.push(e), + Some(Ok(r)) => { + resps.push(r); + if resps.len() >= quorum { + deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY); + } + } + } + } + } + if errors.len() > nodes.len() - quorum { + let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); + return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + } + + // Take all returned items into account to produce the response. + let mut new_items = BTreeMap::<String, K2VItem>::new(); + for v in resps { + if let K2VRpc::PollRangeResponse(node, items) = v { + seen.mark_seen_node_items(node, items.iter()); + for item in items.into_iter() { + match new_items.get_mut(&item.sort_key) { + Some(ent) => { + ent.merge(&item); + } + None => { + new_items.insert(item.sort_key.clone(), item); + } + } + } + } else { + return Err(Error::unexpected_rpc_message(v).into()); + } + } + + if new_items.is_empty() && has_seen_marker { + Ok(None) + } else { + Ok(Some((new_items, seen.encode()?))) + } + } + // ---- internal handlers ---- async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { - let new = self.local_insert(item)?; + let new = { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + self.local_insert(&local_timestamp_tree, item)? + }; // Propagate to rest of network if let Some(updated) = new { @@ -256,11 +388,14 @@ impl K2VRpcHandler { async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { let mut updated_vec = vec![]; - for item in items { - let new = self.local_insert(item)?; + { + let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap(); + for item in items { + let new = self.local_insert(&local_timestamp_tree, item)?; - if let Some(updated) = new { - updated_vec.push(updated); + if let Some(updated) = new { + updated_vec.push(updated); + } } } @@ -272,10 +407,22 @@ impl K2VRpcHandler { Ok(K2VRpc::Ok) } - fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + fn local_insert( + &self, + local_timestamp_tree: &MutexGuard<'_, db::Tree>, + item: &InsertedItem, + ) -> Result<Option<K2VItem>, Error> { + let now = now_msec(); + self.item_table .data - .update_entry_with(&item.partition, &item.sort_key, |ent| { + .update_entry_with(&item.partition, &item.sort_key, |tx, ent| { + let old_local_timestamp = tx + .get(local_timestamp_tree, TIMESTAMP_KEY)? + .and_then(|x| x.try_into().ok()) + .map(u64::from_be_bytes) + .unwrap_or_default(); + let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, @@ -283,13 +430,25 @@ impl K2VRpcHandler { item.sort_key.clone(), ) }); - ent.update(self.system.id, &item.causal_context, item.value.clone()); - ent + let new_local_timestamp = ent.update( + self.system.id, + &item.causal_context, + item.value.clone(), + std::cmp::max(old_local_timestamp, now), + ); + + tx.insert( + local_timestamp_tree, + TIMESTAMP_KEY, + u64::to_be_bytes(new_local_timestamp), + )?; + + Ok(ent) }) } - async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { - let mut chan = self.subscriptions.subscribe(key); + async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe_item(key); let mut value = self .item_table @@ -311,6 +470,71 @@ impl K2VRpcHandler { Ok(value) } + + async fn handle_poll_range( + &self, + range: &PollRange, + seen_str: &Option<String>, + ) -> Result<Vec<K2VItem>, Error> { + if let Some(seen_str) = seen_str { + let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?; + + // Subscribe now to all changes on that partition, + // so that new items that are inserted while we are reading the range + // will be seen in the loop below + let mut chan = self.subscriptions.subscribe_partition(&range.partition); + + // Check for the presence of any new items already stored in the item table + let mut new_items = self.poll_range_read_range(range, &seen)?; + + // If we found no new items, wait for a matching item to arrive + // on the channel + while new_items.is_empty() { + let item = chan.recv().await?; + if range.matches(&item) && seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } else { + // If no seen marker was specified, we do not poll for anything. + // We return immediately with the set of known items (even if + // it is empty), which will give the client an inital view of + // the dataset and an initial seen marker for further + // PollRange calls. + self.poll_range_read_range(range, &RangeSeenMarker::default()) + } + } + + fn poll_range_read_range( + &self, + range: &PollRange, + seen: &RangeSeenMarker, + ) -> Result<Vec<K2VItem>, Error> { + let mut new_items = vec![]; + + let partition_hash = range.partition.hash(); + let first_key = match &range.start { + None => partition_hash.to_vec(), + Some(sk) => self.item_table.data.tree_key(&range.partition, sk), + }; + for item in self.item_table.data.store.range(first_key..)? { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let item = self.item_table.data.decode_entry(&value)?; + if !range.matches(&item) { + break; + } + if seen.is_new_item(&item) { + new_items.push(item); + } + } + + Ok(new_items) + } } #[async_trait] @@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler { } => { let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); select! { - ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), _ = delay => Ok(K2VRpc::PollItemResponse(None)), } } + K2VRpc::PollRange { + range, + seen_str, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)), + _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])), + } + } m => Err(Error::unexpected_rpc_message(m)), } } diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs new file mode 100644 index 00000000..8fe3a582 --- /dev/null +++ b/src/model/k2v/seen.rs @@ -0,0 +1,105 @@ +//! Implements a RangeSeenMarker, a data type used in the PollRange API +//! to indicate which items in the range have already been seen +//! and which have not been seen yet. +//! +//! It consists of a vector clock that indicates that for each node, +//! all items produced by that node with timestamps <= the value in the +//! vector clock has been seen, as well as a set of causal contexts for +//! individual items. + +use std::collections::BTreeMap; + +use base64::prelude::*; +use serde::{Deserialize, Serialize}; + +use garage_util::data::Uuid; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; +use garage_util::error::Error; + +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; +use crate::k2v::causality::*; +use crate::k2v::item_table::*; +use crate::k2v::sub::*; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct RangeSeenMarker { + vector_clock: VectorClock, + items: BTreeMap<String, VectorClock>, +} + +impl RangeSeenMarker { + pub fn new() -> Self { + Self::default() + } + + pub fn restrict(&mut self, range: &PollRange) { + if let Some(start) = &range.start { + self.items = self.items.split_off(start); + } + if let Some(end) = &range.end { + self.items.split_off(end); + } + if let Some(pfx) = &range.prefix { + self.items.retain(|k, _v| k.starts_with(pfx)); + } + } + + pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>( + &mut self, + node: Uuid, + items: I, + ) { + let node = make_node_id(node); + for item in items.into_iter() { + let cc = item.causal_context(); + + if let Some(ts) = cc.vector_clock.get(&node) { + let ent = self.vector_clock.entry(node).or_insert(0); + *ent = std::cmp::max(*ent, *ts); + } + + if vclock_gt(&cc.vector_clock, &self.vector_clock) { + match self.items.get_mut(&item.sort_key) { + None => { + self.items.insert(item.sort_key.clone(), cc.vector_clock); + } + Some(ent) => *ent = vclock_max(ent, &cc.vector_clock), + } + } + } + } + + pub fn canonicalize(&mut self) { + let self_vc = &self.vector_clock; + self.items.retain(|_sk, vc| vclock_gt(vc, self_vc)) + } + + pub fn encode(&mut self) -> Result<String, Error> { + self.canonicalize(); + + let bytes = nonversioned_encode(&self)?; + let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(BASE64_STANDARD.encode(&bytes)) + } + + /// Decode from msgpack+zstd+b64 representation, returns None on error. + pub fn decode(s: &str) -> Option<Self> { + let bytes = BASE64_STANDARD.decode(s).ok()?; + let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?; + nonversioned_decode(&bytes).ok() + } + + pub fn decode_helper(s: &str) -> Result<Self, HelperError> { + Self::decode(s).ok_or_bad_request("Invalid causality token") + } + + pub fn is_new_item(&self, item: &K2VItem) -> bool { + let cc = item.causal_context(); + vclock_gt(&cc.vector_clock, &self.vector_clock) + && self + .items + .get(&item.sort_key) + .map(|vc| vclock_gt(&cc.vector_clock, vc)) + .unwrap_or(true) + } +} diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs new file mode 100644 index 00000000..b1daa271 --- /dev/null +++ b/src/model/k2v/sub.rs @@ -0,0 +1,110 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollRange { + pub partition: K2VItemPartition, + pub prefix: Option<String>, + pub start: Option<String>, + pub end: Option<String>, +} + +#[derive(Default)] +pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>); + +#[derive(Default)] +pub struct SubscriptionManagerInner { + item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>, + part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.item_subscriptions.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.item_subscriptions.insert(key.clone(), tx); + rx + } + } + + pub(crate) fn subscribe_partition( + &self, + part: &K2VItemPartition, + ) -> broadcast::Receiver<K2VItem> { + let mut inner = self.0.lock().unwrap(); + if let Some(s) = inner.part_subscriptions.get(part) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + inner.part_subscriptions.insert(part.clone(), tx); + rx + } + } + + pub(crate) fn notify(&self, item: &K2VItem) { + let mut inner = self.0.lock().unwrap(); + + // 1. Notify single item subscribers, + // removing subscriptions with no more listeners if any + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + if let Some(s) = inner.item_subscriptions.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.item_subscriptions.remove(&key); + } + } + + // 2. Notify partition subscribers, + // removing subscriptions with no more listeners if any + if let Some(s) = inner.part_subscriptions.get(&item.partition) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + inner.part_subscriptions.remove(&item.partition); + } + } + } +} + +impl PollRange { + pub fn matches(&self, item: &K2VItem) -> bool { + item.partition == self.partition + && self + .prefix + .as_ref() + .map(|x| item.sort_key.starts_with(x)) + .unwrap_or(true) + && self + .start + .as_ref() + .map(|x| item.sort_key >= *x) + .unwrap_or(true) + && self + .end + .as_ref() + .map(|x| item.sort_key < *x) + .unwrap_or(true) + } +} diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 3d4d3ff5..57c157d0 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,17 +14,18 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.8.1", path = "../util" } +garage_util = { version = "0.8.2", path = "../util" } arc-swap = "1.0" bytes = "1.0" bytesize = "1.1" gethostname = "0.2" hex = "0.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" itertools="0.10" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } +systemstat = "0.2.3" async-trait = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } @@ -38,8 +39,7 @@ k8s-openapi = { version = "0.16", features = ["v1_22"], optional = true } schemars = { version = "0.8", optional = true } reqwest = { version = "0.11", optional = true, default-features = false, features = ["rustls-tls-manual-roots", "json"] } -# newer version requires rust edition 2021 -pnet_datalink = "0.28" +pnet_datalink = "0.33" futures = "0.3" futures-util = "0.3" diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index b1772a1a..f85f789c 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -113,7 +113,7 @@ impl ConsulDiscovery { let pubkey = ent .node_meta .get("pubkey") - .and_then(|k| hex::decode(&k).ok()) + .and_then(|k| hex::decode(k).ok()) .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index f734942d..a5f8fc6e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -3,6 +3,9 @@ #[macro_use] extern crate tracing; +mod metrics; +mod system_metrics; + #[cfg(feature = "consul-discovery")] mod consul; #[cfg(feature = "kubernetes-discovery")] @@ -14,7 +17,6 @@ pub mod replication_mode; pub mod ring; pub mod system; -mod metrics; pub mod rpc_helper; pub use rpc_helper::*; diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 1ec250c3..e59c372a 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,9 @@ use opentelemetry::{ }; pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; -use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, - PRIO_SECONDARY, + IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, + PRIO_NORMAL, PRIO_SECONDARY, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1f4d86e7..c549d8fc 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -38,6 +39,8 @@ use crate::replication_mode::*; use crate::ring::*; use crate::rpc_helper::*; +use crate::system_metrics::*; + const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); @@ -104,6 +107,8 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option<KubernetesDiscoveryConfig>, + metrics: SystemMetrics, + replication_mode: ReplicationMode, replication_factor: usize, @@ -113,18 +118,28 @@ pub struct System { /// Path to metadata directory pub metadata_dir: PathBuf, + /// Path to data directory + pub data_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeStatus { /// Hostname of the node pub hostname: String, + /// Replication factor configured on the node pub replication_factor: usize, /// Cluster layout version pub cluster_layout_version: u64, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, + + /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) + #[serde(default)] + pub meta_disk_avail: Option<(u64, u64)>, + /// Disk usage on partition containing data directory (tuple: `(avail, total)`) + #[serde(default)] + pub data_disk_avail: Option<(u64, u64)>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -200,7 +215,7 @@ pub fn gen_node_key(metadata_dir: &Path) -> Result<NodeKey, Error> { } else { if !metadata_dir.exists() { info!("Metadata directory does not exist, creating it."); - std::fs::create_dir(&metadata_dir)?; + std::fs::create_dir(metadata_dir)?; } info!("Generating new node key pair."); @@ -266,14 +281,10 @@ impl System { } }; - let local_status = NodeStatus { - hostname: gethostname::gethostname() - .into_string() - .unwrap_or_else(|_| "<invalid utf-8>".to_string()), - replication_factor, - cluster_layout_version: cluster_layout.version, - cluster_layout_staging_hash: cluster_layout.staging_hash, - }; + let metrics = SystemMetrics::new(replication_factor); + + let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); @@ -365,10 +376,12 @@ impl System { consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), + metrics, ring, update_ring: Mutex::new(update_ring), metadata_dir: config.metadata_dir.clone(), + data_dir: config.data_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); Ok(sys) @@ -406,12 +419,7 @@ impl System { .get(&n.id.into()) .cloned() .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), + .unwrap_or_else(NodeStatus::unknown), }) .collect::<Vec<_>>(); known_nodes @@ -590,6 +598,9 @@ impl System { let ring = self.ring.borrow(); new_si.cluster_layout_version = ring.layout.version; new_si.cluster_layout_staging_hash = ring.layout.staging_hash; + + new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); + self.local_status.swap(Arc::new(new_si)); } @@ -854,6 +865,69 @@ impl EndpointHandler<SystemRpc> for System { } } +impl NodeStatus { + fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { + NodeStatus { + hostname: gethostname::gethostname() + .into_string() + .unwrap_or_else(|_| "<invalid utf-8>".to_string()), + replication_factor, + cluster_layout_version: layout.version, + cluster_layout_staging_hash: layout.staging_hash, + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn unknown() -> Self { + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + meta_disk_avail: None, + data_disk_avail: None, + } + } + + fn update_disk_usage(&mut self, meta_dir: &Path, data_dir: &Path, metrics: &SystemMetrics) { + use systemstat::{Platform, System}; + let mounts = System::new().mounts().unwrap_or_default(); + + let mount_avail = |path: &Path| { + mounts + .iter() + .filter(|x| path.starts_with(&x.fs_mounted_on)) + .max_by_key(|x| x.fs_mounted_on.len()) + .map(|x| (x.avail.as_u64(), x.total.as_u64())) + }; + + self.meta_disk_avail = mount_avail(meta_dir); + self.data_disk_avail = mount_avail(data_dir); + + if let Some((avail, total)) = self.meta_disk_avail { + metrics + .values + .meta_disk_avail + .store(avail, Ordering::Relaxed); + metrics + .values + .meta_disk_total + .store(total, Ordering::Relaxed); + } + if let Some((avail, total)) = self.data_disk_avail { + metrics + .values + .data_disk_avail + .store(avail, Ordering::Relaxed); + metrics + .values + .data_disk_total + .store(total, Ordering::Relaxed); + } + } +} + fn get_default_ip() -> Option<IpAddr> { pnet_datalink::interfaces() .iter() diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs new file mode 100644 index 00000000..af81b71f --- /dev/null +++ b/src/rpc/system_metrics.rs @@ -0,0 +1,77 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use opentelemetry::{global, metrics::*, KeyValue}; + +/// TableMetrics reference all counter used for metrics +pub struct SystemMetrics { + pub(crate) _garage_build_info: ValueObserver<u64>, + pub(crate) _replication_factor: ValueObserver<u64>, + pub(crate) _disk_avail: ValueObserver<u64>, + pub(crate) _disk_total: ValueObserver<u64>, + pub(crate) values: Arc<SystemMetricsValues>, +} + +#[derive(Default)] +pub struct SystemMetricsValues { + pub(crate) data_disk_total: AtomicU64, + pub(crate) data_disk_avail: AtomicU64, + pub(crate) meta_disk_total: AtomicU64, + pub(crate) meta_disk_avail: AtomicU64, +} + +impl SystemMetrics { + pub fn new(replication_factor: usize) -> Self { + let meter = global::meter("garage_system"); + let values = Arc::new(SystemMetricsValues::default()); + let values1 = values.clone(); + let values2 = values.clone(); + Self { + _garage_build_info: meter + .u64_value_observer("garage_build_info", move |observer| { + observer.observe( + 1, + &[ + KeyValue::new("rustversion", garage_util::version::rust_version()), + KeyValue::new("version", garage_util::version::garage_version()), + ], + ) + }) + .with_description("Garage build info") + .init(), + _replication_factor: meter + .u64_value_observer("garage_replication_factor", move |observer| { + observer.observe(replication_factor as u64, &[]) + }) + .with_description("Garage replication factor setting") + .init(), + _disk_avail: meter + .u64_value_observer("garage_local_disk_avail", move |observer| { + match values1.data_disk_avail.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "data")]), + }; + match values1.meta_disk_avail.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), + }; + }) + .with_description("Garage available disk space on each node") + .init(), + _disk_total: meter + .u64_value_observer("garage_local_disk_total", move |observer| { + match values2.data_disk_total.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "data")]), + }; + match values2.meta_disk_total.load(Ordering::Relaxed) { + 0 => (), + x => observer.observe(x, &[KeyValue::new("volume", "metadata")]), + }; + }) + .with_description("Garage total disk space on each node") + .init(), + values, + } + } +} diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 3911c945..c794c924 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,9 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_util = { version = "0.8.1", path = "../util" } +garage_db = { version = "0.8.2", path = "../db" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_util = { version = "0.8.2", path = "../util" } opentelemetry = "0.17" @@ -25,7 +25,7 @@ arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/table/data.rs b/src/table/data.rs index 5c792f1f..26cc3a5a 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { - Some(mut ent) => { - ent.merge(&update); - ent - } - None => update.clone(), - })?; + self.update_entry_with( + update.partition_key(), + update.sort_key(), + |_tx, ent| match ent { + Some(mut ent) => { + ent.merge(&update); + Ok(ent) + } + None => Ok(update.clone()), + }, + )?; Ok(()) } @@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { &self, partition_key: &F::P, sort_key: &F::S, - f: impl Fn(Option<F::E>) -> F::E, + update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>, ) -> Result<Option<F::E>, Error> { let tree_key = self.tree_key(partition_key, sort_key); @@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; - let new_entry = f(Some(old_entry.clone())); + let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?; (Some(old_entry), Some(old_bytes), new_entry) } - None => (None, None, f(None)), + None => (None, None, update_fn(&mut tx, None)?), }; // Changed can be true in two scenarios diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 0468b7f4..387471ed 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,11 +14,11 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } +garage_db = { version = "0.8.2", path = "../db" } arc-swap = "1.0" async-trait = "0.1" -blake2 = "0.9" +blake2 = "0.10" bytes = "1.0" digest = "0.10" err-derive = "0.3" @@ -27,7 +27,7 @@ hexdump = "0.1" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" lazy_static = "1.4" -tracing = "0.1.30" +tracing = "0.1" rand = "0.8" sha2 = "0.10" @@ -35,7 +35,7 @@ chrono = "0.4" rmp-serde = "1.1" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = "1.0" -toml = "0.5" +toml = "0.6" futures = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } @@ -47,6 +47,11 @@ hyper = "0.14" opentelemetry = { version = "0.17", features = [ "rt-tokio", "metrics", "trace" ] } +[build-dependencies] +rustc_version = "0.4.0" + +[dev-dependencies] +mktemp = "0.5" [features] k2v = [] diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 41b48e93..607cd7a3 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,5 +1,6 @@ //! Job runner for futures and async functions +pub mod vars; pub mod worker; use std::collections::HashMap; diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs new file mode 100644 index 00000000..7a449c95 --- /dev/null +++ b/src/util/background/vars.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; +use crate::migrate::Migrate; +use crate::persister::PersisterShared; + +pub struct BgVars { + vars: HashMap<&'static str, Box<dyn BgVarTrait>>, +} + +impl BgVars { + pub fn new() -> Self { + Self { + vars: HashMap::new(), + } + } + + pub fn register_rw<V, T, GF, SF>( + &mut self, + p: &PersisterShared<V>, + name: &'static str, + get_fn: GF, + set_fn: SF, + ) where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let p2 = p.clone(); + let set_fn = move |v| set_fn(&p2, v); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF) + where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name))); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn get(&self, var: &str) -> Result<String, Error> { + Ok(self + .vars + .get(var) + .ok_or_message("variable does not exist")? + .get()) + } + + pub fn get_all(&self) -> Vec<(&'static str, String)> { + self.vars.iter().map(|(k, v)| (*k, v.get())).collect() + } + + pub fn set(&self, var: &str, val: &str) -> Result<(), Error> { + self.vars + .get(var) + .ok_or_message("variable does not exist")? + .set(val) + } +} + +impl Default for BgVars { + fn default() -> Self { + Self::new() + } +} + +// ---- + +trait BgVarTrait: Send + Sync + 'static { + fn get(&self) -> String; + fn set(&self, v: &str) -> Result<(), Error>; +} + +struct BgVar<T, GF, SF> +where + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn() -> T + Send + Sync + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + get_fn: GF, + set_fn: SF, +} + +impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF> +where + T: FromStr + ToString + Sync + Send + 'static, + GF: Fn() -> T + Sync + Send + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + fn get(&self) -> String { + (self.get_fn)().to_string() + } + + fn set(&self, vstr: &str) -> Result<(), Error> { + let value = vstr + .parse() + .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?; + (self.set_fn)(value) + } +} diff --git a/src/util/build.rs b/src/util/build.rs new file mode 100644 index 00000000..a4e955b8 --- /dev/null +++ b/src/util/build.rs @@ -0,0 +1,8 @@ +use rustc_version::version; + +fn main() { + // Acquire the version of Rust used to compile, this is added as a label to + // the garage_build_info metric. + let v = version().unwrap(); + println!("cargo:rustc-env=RUSTC_VERSION={v}"); +} diff --git a/src/util/config.rs b/src/util/config.rs index 04f8375a..2176353e 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -34,7 +34,9 @@ pub struct Config { pub compression_level: Option<i32>, /// RPC secret key: 32 bytes hex encoded - pub rpc_secret: String, + pub rpc_secret: Option<String>, + /// Optional file where RPC secret key is read from + pub rpc_secret_file: Option<String>, /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, @@ -118,10 +120,17 @@ pub struct WebConfig { pub struct AdminConfig { /// Address and port to bind for admin API serving pub api_bind_addr: Option<SocketAddr>, + /// Bearer token to use to scrape metrics pub metrics_token: Option<String>, + /// File to read metrics token from + pub metrics_token_file: Option<String>, + /// Bearer token to use to access Admin API endpoints pub admin_token: Option<String>, + /// File to read admin token from + pub admin_token_file: Option<String>, + /// OTLP server to where to export traces pub trace_sink: Option<String>, } @@ -177,7 +186,57 @@ pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { let mut config = String::new(); file.read_to_string(&mut config)?; - Ok(toml::from_str(&config)?) + let mut parsed_config: Config = toml::from_str(&config)?; + + secret_from_file( + &mut parsed_config.rpc_secret, + &parsed_config.rpc_secret_file, + "rpc_secret", + )?; + secret_from_file( + &mut parsed_config.admin.metrics_token, + &parsed_config.admin.metrics_token_file, + "admin.metrics_token", + )?; + secret_from_file( + &mut parsed_config.admin.admin_token, + &parsed_config.admin.admin_token_file, + "admin.admin_token", + )?; + + Ok(parsed_config) +} + +fn secret_from_file( + secret: &mut Option<String>, + secret_file: &Option<String>, + name: &'static str, +) -> Result<(), Error> { + match (&secret, &secret_file) { + (_, None) => { + // no-op + } + (Some(_), Some(_)) => { + return Err(format!("only one of `{}` and `{}_file` can be set", name, name).into()); + } + (None, Some(file_path)) => { + #[cfg(unix)] + if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") { + use std::os::unix::fs::MetadataExt; + let metadata = std::fs::metadata(&file_path)?; + if metadata.mode() & 0o077 != 0 { + return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into()); + } + } + let mut file = std::fs::OpenOptions::new().read(true).open(file_path)?; + let mut secret_buf = String::new(); + file.read_to_string(&mut secret_buf)?; + // trim_end: allows for use case such as `echo "$(openssl rand -hex 32)" > somefile`. + // also editors sometimes add a trailing newline + *secret = Some(String::from(secret_buf.trim_end())); + } + } + Ok(()) } fn default_compression() -> Option<i32> { @@ -233,3 +292,116 @@ where deserializer.deserialize_any(OptionVisitor) } + +#[cfg(test)] +mod tests { + use crate::error::Error; + use std::fs::File; + use std::io::Write; + + #[test] + fn test_rpc_secret() -> Result<(), Error> { + let path2 = mktemp::Temp::new_file()?; + let mut file2 = File::create(path2.as_path())?; + writeln!( + file2, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret = "foo" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "# + )?; + + let config = super::read_config(path2.to_path_buf())?; + assert_eq!("foo", config.rpc_secret.unwrap()); + drop(path2); + drop(file2); + + Ok(()) + } + + #[test] + fn test_rpc_secret_file_works() -> Result<(), Error> { + let path_secret = mktemp::Temp::new_file()?; + let mut file_secret = File::create(path_secret.as_path())?; + writeln!(file_secret, "foo")?; + drop(file_secret); + + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + let path_secret_path = path_secret.as_path(); + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret_file = "{}" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "#, + path_secret_path.display() + )?; + let config = super::read_config(path_config.to_path_buf())?; + assert_eq!("foo", config.rpc_secret.unwrap()); + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let metadata = std::fs::metadata(&path_secret_path)?; + let mut perm = metadata.permissions(); + perm.set_mode(0o660); + std::fs::set_permissions(&path_secret_path, perm)?; + + std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "false"); + assert!(super::read_config(path_config.to_path_buf()).is_err()); + + std::env::set_var("GARAGE_ALLOW_WORLD_READABLE_SECRETS", "true"); + assert!(super::read_config(path_config.to_path_buf()).is_ok()); + } + + drop(path_config); + drop(path_secret); + drop(file_config); + Ok(()) + } + + #[test] + fn test_rcp_secret_and_rpc_secret_file_cannot_be_set_both() -> Result<(), Error> { + let path_config = mktemp::Temp::new_file()?; + let mut file_config = File::create(path_config.as_path())?; + writeln!( + file_config, + r#" + metadata_dir = "/tmp/garage/meta" + data_dir = "/tmp/garage/data" + replication_mode = "3" + rpc_bind_addr = "[::]:3901" + rpc_secret= "dummy" + rpc_secret_file = "dummy" + + [s3_api] + s3_region = "garage" + api_bind_addr = "[::]:3900" + "# + )?; + assert_eq!( + "only one of `rpc_secret` and `rpc_secret_file` can be set", + super::read_config(path_config.to_path_buf()) + .unwrap_err() + .to_string() + ); + drop(path_config); + drop(file_config); + Ok(()) + } +} diff --git a/src/util/data.rs b/src/util/data.rs index 3f61e301..bdd8daee 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -115,9 +115,9 @@ pub fn sha256sum(data: &[u8]) -> Hash { /// Compute the blake2 of a slice pub fn blake2sum(data: &[u8]) -> Hash { - use blake2::{Blake2b, Digest}; + use blake2::{Blake2b512, Digest}; - let mut hasher = Blake2b::new(); + let mut hasher = Blake2b512::new(); hasher.update(data); let mut hash = [0u8; 32]; hash.copy_from_slice(&hasher.finalize()[..32]); diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs new file mode 100644 index 00000000..6ae275aa --- /dev/null +++ b/src/util/forwarded_headers.rs @@ -0,0 +1,63 @@ +use http::{HeaderMap, HeaderValue}; +use std::net::IpAddr; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; + +pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { + let forwarded_for_header = headers + .get("x-forwarded-for") + .ok_or_message("X-Forwarded-For header not provided")?; + + let forwarded_for_ip_str = forwarded_for_header + .to_str() + .ok_or_message("Error parsing X-Forwarded-For header")?; + + let client_ip = IpAddr::from_str(&forwarded_for_ip_str) + .ok_or_message("Valid IP address not found in X-Forwarded-For header")?; + + Ok(client_ip.to_string()) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_handle_forwarded_for_headers_ipv4_client() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "192.0.2.100".parse().unwrap()); + + if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) { + assert_eq!(forwarded_ip, "192.0.2.100"); + } + } + + #[test] + fn test_handle_forwarded_for_headers_ipv6_client() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "2001:db8::f00d:cafe".parse().unwrap()); + + if let Ok(forwarded_ip) = handle_forwarded_for_headers(&test_headers) { + assert_eq!(forwarded_ip, "2001:db8::f00d:cafe"); + } + } + + #[test] + fn test_handle_forwarded_for_headers_invalid_ip() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("X-Forwarded-For", "www.example.com".parse().unwrap()); + + let result = handle_forwarded_for_headers(&test_headers); + assert!(result.is_err()); + } + + #[test] + fn test_handle_forwarded_for_headers_missing() { + let mut test_headers = HeaderMap::new(); + test_headers.insert("Host", "www.deuxfleurs.fr".parse().unwrap()); + + let result = handle_forwarded_for_headers(&test_headers); + assert!(result.is_err()); + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index be82061f..c9110fb2 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -11,10 +11,10 @@ pub mod data; pub mod encode; pub mod error; pub mod formater; +pub mod forwarded_headers; pub mod metrics; pub mod migrate; pub mod persister; pub mod time; -pub mod token_bucket; pub mod tranquilizer; pub mod version; diff --git a/src/util/persister.rs b/src/util/persister.rs index 4b9adf51..5c66bbed 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,5 +1,6 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> { Ok(()) } } + +pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>); + +impl<V: Migrate + Default> Clone for PersisterShared<V> { + fn clone(&self) -> PersisterShared<V> { + PersisterShared(self.0.clone()) + } +} + +impl<V: Migrate + Default> PersisterShared<V> { + pub fn new(base_dir: &Path, file_name: &str) -> Self { + let persister = Persister::new(base_dir, file_name); + let value = persister.load().unwrap_or_default(); + Self(Arc::new((persister, RwLock::new(value)))) + } + + pub fn get_with<F, R>(&self, f: F) -> R + where + F: FnOnce(&V) -> R, + { + let value = self.0 .1.read().unwrap(); + f(&value) + } + + pub fn set_with<F>(&self, f: F) -> Result<(), Error> + where + F: FnOnce(&mut V), + { + let mut value = self.0 .1.write().unwrap(); + f(&mut value); + self.0 .0.save(&value) + } +} diff --git a/src/util/time.rs b/src/util/time.rs index 257b4d2a..42f41a44 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -25,6 +25,6 @@ pub fn increment_logical_clock_2(prev: u64, prev2: u64) -> u64 { pub fn msec_to_rfc3339(msecs: u64) -> String { let secs = msecs as i64 / 1000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; - let timestamp = Utc.timestamp(secs, nanos); + let timestamp = Utc.timestamp_opt(secs, nanos).unwrap(); timestamp.to_rfc3339_opts(SecondsFormat::Millis, true) } diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs deleted file mode 100644 index cc0dfa1f..00000000 --- a/src/util/token_bucket.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::{Duration, Instant}; - -use tokio::time::sleep; - -pub struct TokenBucket { - // Replenish rate: number of tokens per second - replenish_rate: u64, - // Current number of tokens - tokens: u64, - // Last replenish time - last_replenish: Instant, -} - -impl TokenBucket { - pub fn new(replenish_rate: u64) -> Self { - Self { - replenish_rate, - tokens: 0, - last_replenish: Instant::now(), - } - } - - pub async fn take(&mut self, tokens: u64) { - while self.tokens < tokens { - let needed = tokens - self.tokens; - let delay = (needed as f64) / (self.replenish_rate as f64); - sleep(Duration::from_secs_f64(delay)).await; - self.replenish(); - } - self.tokens -= tokens; - } - - pub fn replenish(&mut self) { - let now = Instant::now(); - let new_tokens = - ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; - self.tokens += new_tokens; - self.last_replenish = now; - } -} diff --git a/src/util/version.rs b/src/util/version.rs index b515dccc..2b2ea271 100644 --- a/src/util/version.rs +++ b/src/util/version.rs @@ -26,3 +26,7 @@ pub fn init_version(version: &'static str) { pub fn init_features(features: &'static [&'static str]) { FEATURES.store(Some(Arc::new(features))); } + +pub fn rust_version() -> &'static str { + env!("RUSTC_VERSION") +} diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index dbc5e5fb..d0a23af4 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_web" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"] edition = "2018" license = "AGPL-3.0" @@ -14,13 +14,13 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.8.1", path = "../api" } -garage_model = { version = "0.8.1", path = "../model" } -garage_util = { version = "0.8.1", path = "../util" } -garage_table = { version = "0.8.1", path = "../table" } +garage_api = { version = "0.8.2", path = "../api" } +garage_model = { version = "0.8.2", path = "../model" } +garage_util = { version = "0.8.2", path = "../util" } +garage_table = { version = "0.8.2", path = "../table" } err-derive = "0.3" -tracing = "0.1.30" +tracing = "0.1" percent-encoding = "2.1.0" futures = "0.3" diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 1541c297..0c7edf23 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -29,6 +29,7 @@ use garage_model::garage::Garage; use garage_table::*; use garage_util::error::Error as GarageError; +use garage_util::forwarded_headers; use garage_util::metrics::{gen_trace_id, RecordDuration}; struct WebMetrics { @@ -104,7 +105,19 @@ impl WebServer { req: Request<Body>, addr: SocketAddr, ) -> Result<Response<Body>, Infallible> { - info!("{} {} {}", addr, req.method(), req.uri()); + if let Ok(forwarded_for_ip_addr) = + forwarded_headers::handle_forwarded_for_headers(&req.headers()) + { + info!( + "{} (via {}) {} {}", + forwarded_for_ip_addr, + addr, + req.method(), + req.uri() + ); + } else { + info!("{} {} {}", addr, req.method(), req.uri()); + } // Lots of instrumentation let tracer = opentelemetry::global::tracer("garage"); @@ -249,7 +262,6 @@ impl WebServer { ); *error_doc.status_mut() = error.http_status_code(); - error.add_headers(error_doc.headers_mut()); // Preserve error message in a special header for error_line in error.to_string().split('\n') { |