aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml14
-rw-r--r--src/api/admin/api_server.rs70
-rw-r--r--src/api/admin/bucket.rs4
-rw-r--r--src/api/admin/key.rs4
-rw-r--r--src/api/admin/router.rs6
-rw-r--r--src/api/generic_server.rs2
-rw-r--r--src/api/k2v/batch.rs4
-rw-r--r--src/api/s3/get.rs4
-rw-r--r--src/api/s3/post_object.rs12
-rw-r--r--src/api/signature/payload.rs44
-rw-r--r--src/block/Cargo.toml12
-rw-r--r--src/block/manager.rs30
-rw-r--r--src/block/repair.rs14
-rw-r--r--src/db/Cargo.toml6
-rw-r--r--src/db/lmdb_adapter.rs2
-rw-r--r--src/format-table/Cargo.toml12
-rw-r--r--src/format-table/README.md13
-rw-r--r--src/format-table/lib.rs (renamed from src/util/formater.rs)17
-rw-r--r--src/garage/Cargo.toml27
-rw-r--r--src/garage/admin.rs1298
-rw-r--r--src/garage/admin/block.rs160
-rw-r--r--src/garage/admin/bucket.rs496
-rw-r--r--src/garage/admin/key.rs149
-rw-r--r--src/garage/admin/mod.rs538
-rw-r--r--src/garage/cli/cmd.rs2
-rw-r--r--src/garage/cli/layout.rs2
-rw-r--r--src/garage/cli/structs.rs2
-rw-r--r--src/garage/cli/util.rs9
-rw-r--r--src/garage/main.rs6
-rw-r--r--src/garage/tests/bucket.rs8
-rw-r--r--src/garage/tests/common/client.rs11
-rw-r--r--src/garage/tests/common/custom_requester.rs1
-rw-r--r--src/garage/tests/common/mod.rs19
-rw-r--r--src/garage/tests/k2v_client/mod.rs1
-rw-r--r--src/garage/tests/k2v_client/simple.rs60
-rw-r--r--src/garage/tests/lib.rs2
-rw-r--r--src/garage/tests/s3/multipart.rs4
-rw-r--r--src/garage/tests/s3/objects.rs4
-rw-r--r--src/garage/tests/s3/simple.rs2
-rw-r--r--src/garage/tests/s3/website.rs51
-rw-r--r--src/k2v-client/Cargo.toml22
-rw-r--r--src/k2v-client/bin/k2v-cli.rs62
-rw-r--r--src/k2v-client/error.rs16
-rw-r--r--src/k2v-client/lib.rs333
-rw-r--r--src/model/Cargo.toml10
-rw-r--r--src/model/k2v/seen.rs2
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/rpc/consul.rs171
-rw-r--r--src/table/Cargo.toml6
-rw-r--r--src/table/data.rs16
-rw-r--r--src/table/util.rs9
-rw-r--r--src/util/Cargo.toml3
-rw-r--r--src/util/config.rs25
-rw-r--r--src/util/forwarded_headers.rs2
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/migrate.rs2
-rw-r--r--src/util/version.rs12
-rw-r--r--src/web/Cargo.toml8
-rw-r--r--src/web/web_server.rs130
59 files changed, 2210 insertions, 1746 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 9babec02..c2155eb9 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -14,11 +14,11 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.8.2", path = "../model" }
-garage_table = { version = "0.8.2", path = "../table" }
-garage_block = { version = "0.8.2", path = "../block" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_model.workspace = true
+garage_table.workspace = true
+garage_block.workspace = true
+garage_util.workspace = true
+garage_rpc.workspace = true
async-trait = "0.1.7"
base64 = "0.21"
@@ -28,7 +28,7 @@ crypto-common = "0.1"
err-derive = "0.3"
hex = "0.4"
hmac = "0.12"
-idna = "0.3"
+idna = "0.4"
tracing = "0.1"
md-5 = "0.10"
nom = "7.1"
@@ -47,7 +47,7 @@ http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
multer = "2.0"
percent-encoding = "2.1.0"
-roxmltree = "0.14"
+roxmltree = "0.18"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0"
diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs
index 58dd38d8..50c79120 100644
--- a/src/api/admin/api_server.rs
+++ b/src/api/admin/api_server.rs
@@ -26,6 +26,7 @@ use crate::admin::cluster::*;
use crate::admin::error::*;
use crate::admin::key::*;
use crate::admin::router::{Authorization, Endpoint};
+use crate::helpers::host_to_bucket;
pub struct AdminApiServer {
garage: Arc<Garage>,
@@ -78,10 +79,7 @@ impl AdminApiServer {
.body(Body::empty())?)
}
- async fn handle_check_website_enabled(
- &self,
- req: Request<Body>,
- ) -> Result<Response<Body>, Error> {
+ async fn handle_check_domain(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
let query_params: HashMap<String, String> = req
.uri()
.query()
@@ -102,12 +100,56 @@ impl AdminApiServer {
.get("domain")
.ok_or_internal_error("Could not parse domain query string")?;
- let bucket_id = self
+ if self.check_domain(domain).await? {
+ Ok(Response::builder()
+ .status(StatusCode::OK)
+ .body(Body::from(format!(
+ "Domain '{domain}' is managed by Garage"
+ )))?)
+ } else {
+ Err(Error::bad_request(format!(
+ "Domain '{domain}' is not managed by Garage"
+ )))
+ }
+ }
+
+ async fn check_domain(&self, domain: &str) -> Result<bool, Error> {
+ // Resolve bucket from domain name, inferring if the website must be activated for the
+ // domain to be valid.
+ let (bucket_name, must_check_website) = if let Some(bname) = self
+ .garage
+ .config
+ .s3_api
+ .root_domain
+ .as_ref()
+ .and_then(|rd| host_to_bucket(domain, rd))
+ {
+ (bname.to_string(), false)
+ } else if let Some(bname) = self
+ .garage
+ .config
+ .s3_web
+ .as_ref()
+ .and_then(|sw| host_to_bucket(domain, sw.root_domain.as_str()))
+ {
+ (bname.to_string(), true)
+ } else {
+ (domain.to_string(), true)
+ };
+
+ let bucket_id = match self
.garage
.bucket_helper()
- .resolve_global_bucket_name(&domain)
+ .resolve_global_bucket_name(&bucket_name)
.await?
- .ok_or(HelperError::NoSuchBucket(domain.to_string()))?;
+ {
+ Some(bucket_id) => bucket_id,
+ None => return Ok(false),
+ };
+
+ if !must_check_website {
+ return Ok(true);
+ }
let bucket = self
.garage
@@ -119,16 +161,8 @@ impl AdminApiServer {
let bucket_website_config = bucket_state.website_config.get();
match bucket_website_config {
- Some(_v) => {
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(format!(
- "Bucket '{domain}' is authorized for website hosting"
- )))?)
- }
- None => Err(Error::bad_request(format!(
- "Bucket '{domain}' is not authorized for website hosting"
- ))),
+ Some(_v) => Ok(true),
+ None => Ok(false),
}
}
@@ -229,7 +263,7 @@ impl ApiHandler for AdminApiServer {
match endpoint {
Endpoint::Options => self.handle_options(&req),
- Endpoint::CheckWebsiteEnabled => self.handle_check_website_enabled(req).await,
+ Endpoint::CheckDomain => self.handle_check_domain(req).await,
Endpoint::Health => self.handle_health(),
Endpoint::Metrics => self.handle_metrics(),
Endpoint::GetClusterStatus => handle_get_cluster_status(&self.garage).await,
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs
index e60f07ca..f0a4a9e7 100644
--- a/src/api/admin/bucket.rs
+++ b/src/api/admin/bucket.rs
@@ -183,8 +183,8 @@ async fn bucket_info_results(
}
}),
keys: relevant_keys
- .into_iter()
- .map(|(_, key)| {
+ .into_values()
+ .map(|key| {
let p = key.state.as_option().unwrap();
GetBucketInfoKey {
access_key_id: key.key_id,
diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs
index 2bbabb7b..d74ca361 100644
--- a/src/api/admin/key.rs
+++ b/src/api/admin/key.rs
@@ -183,8 +183,8 @@ async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Bod
create_bucket: *key_state.allow_create_bucket.get(),
},
buckets: relevant_buckets
- .into_iter()
- .map(|(_, bucket)| {
+ .into_values()
+ .map(|bucket| {
let state = bucket.state.as_option().unwrap();
KeyInfoBucketResult {
id: hex::encode(bucket.id),
diff --git a/src/api/admin/router.rs b/src/api/admin/router.rs
index 0dcb1546..0225f18b 100644
--- a/src/api/admin/router.rs
+++ b/src/api/admin/router.rs
@@ -17,7 +17,7 @@ router_match! {@func
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
Options,
- CheckWebsiteEnabled,
+ CheckDomain,
Health,
Metrics,
GetClusterStatus,
@@ -92,7 +92,7 @@ impl Endpoint {
let res = router_match!(@gen_path_parser (req.method(), path, query) [
OPTIONS _ => Options,
- GET "/check" => CheckWebsiteEnabled,
+ GET "/check" => CheckDomain,
GET "/health" => Health,
GET "/metrics" => Metrics,
GET "/v0/status" => GetClusterStatus,
@@ -138,7 +138,7 @@ impl Endpoint {
pub fn authorization_type(&self) -> Authorization {
match self {
Self::Health => Authorization::None,
- Self::CheckWebsiteEnabled => Authorization::None,
+ Self::CheckDomain => Authorization::None,
Self::Metrics => Authorization::MetricsToken,
_ => Authorization::AdminToken,
}
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs
index d0354d28..757b85ec 100644
--- a/src/api/generic_server.rs
+++ b/src/api/generic_server.rs
@@ -128,7 +128,7 @@ impl<A: ApiHandler> ApiServer<A> {
let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) =
- forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ forwarded_headers::handle_forwarded_for_headers(req.headers())
{
info!(
"{} (via {}) {} {}",
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 26d678da..294380ea 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -282,8 +282,8 @@ pub(crate) async fn handle_poll_range(
if let Some((items, seen_marker)) = resp {
let resp = PollRangeResponse {
items: items
- .into_iter()
- .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .into_values()
+ .map(ReadBatchResponseItem::from)
.collect::<Vec<_>>(),
seen_marker,
};
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 2a99551a..cde7b461 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -443,7 +443,7 @@ fn body_from_blocks_range(
// block.part_number, which is not the same in the case of a multipart upload)
let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
all_blocks.len(),
- 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize,
+ 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size, 1024)) as usize,
));
let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() {
@@ -454,7 +454,7 @@ fn body_from_blocks_range(
if block_offset < end && block_offset + b.size > begin {
blocks.push((*b, block_offset));
}
- block_offset += b.size as u64;
+ block_offset += b.size;
}
let order_stream = OrderTag::stream();
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index f2098ab0..542b7a81 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -30,7 +30,7 @@ pub async fn handle_post_object(
.get(header::CONTENT_TYPE)
.and_then(|ct| ct.to_str().ok())
.and_then(|ct| multer::parse_boundary(ct).ok())
- .ok_or_bad_request("Counld not get multipart boundary")?;
+ .ok_or_bad_request("Could not get multipart boundary")?;
// 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable
// for a PostObject
@@ -64,15 +64,13 @@ pub async fn handle_post_object(
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
"acl" => {
if params.insert("x-amz-acl", content).is_some() {
- return Err(Error::bad_request(
- "Field 'acl' provided more than one time",
- ));
+ return Err(Error::bad_request("Field 'acl' provided more than once"));
}
}
_ => {
if params.insert(&name, content).is_some() {
return Err(Error::bad_request(format!(
- "Field '{}' provided more than one time",
+ "Field '{}' provided more than once",
name
)));
}
@@ -149,7 +147,7 @@ pub async fn handle_post_object(
.ok_or_bad_request("Invalid expiration date")?
.into();
if Utc::now() - expiration > Duration::zero() {
- return Err(Error::bad_request("Expiration date is in the paste"));
+ return Err(Error::bad_request("Expiration date is in the past"));
}
let mut conditions = decoded_policy.into_conditions()?;
@@ -330,7 +328,7 @@ impl Policy {
if map.len() != 1 {
return Err(Error::bad_request("Invalid policy item"));
}
- let (mut k, v) = map.into_iter().next().expect("size was verified");
+ let (mut k, v) = map.into_iter().next().expect("Size could not be verified");
k.make_ascii_lowercase();
params.entry(k).or_default().push(Operation::Equal(v));
}
diff --git a/src/api/signature/payload.rs b/src/api/signature/payload.rs
index 4c7934e5..b50fb3bb 100644
--- a/src/api/signature/payload.rs
+++ b/src/api/signature/payload.rs
@@ -19,7 +19,7 @@ use crate::signature::error::*;
pub async fn check_payload_signature(
garage: &Garage,
- service: &str,
+ service: &'static str,
request: &Request<Body>,
) -> Result<(Option<Key>, Option<Hash>), Error> {
let mut headers = HashMap::new();
@@ -51,6 +51,7 @@ pub async fn check_payload_signature(
};
let canonical_request = canonical_request(
+ service,
request.method(),
request.uri(),
&headers,
@@ -184,7 +185,7 @@ fn parse_query_authorization(
if duration > 7 * 24 * 3600 {
return Err(Error::bad_request(
- "X-Amz-Exprires may not exceed a week".to_string(),
+ "X-Amz-Expires may not exceed a week".to_string(),
));
}
@@ -210,7 +211,7 @@ fn parse_query_authorization(
fn parse_credential(cred: &str) -> Result<(String, String), Error> {
let first_slash = cred
.find('/')
- .ok_or_bad_request("Credentials does not contain / in authorization field")?;
+ .ok_or_bad_request("Credentials does not contain '/' in authorization field")?;
let (key_id, scope) = cred.split_at(first_slash);
Ok((
key_id.to_string(),
@@ -231,15 +232,50 @@ pub fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_re
}
pub fn canonical_request(
+ service: &'static str,
method: &Method,
uri: &hyper::Uri,
headers: &HashMap<String, String>,
signed_headers: &str,
content_sha256: &str,
) -> String {
+ // There seems to be evidence that in AWSv4 signatures, the path component is url-encoded
+ // a second time when building the canonical request, as specified in this documentation page:
+ // -> https://docs.aws.amazon.com/rolesanywhere/latest/userguide/authentication-sign-process.html
+ // However this documentation page is for a specific service ("roles anywhere"), and
+ // in the S3 service we know for a fact that there is no double-urlencoding, because all of
+ // the tests we made with external software work without it.
+ //
+ // The theory is that double-urlencoding occurs for all services except S3,
+ // which is what is implemented in rusoto_signature:
+ // -> https://docs.rs/rusoto_signature/latest/src/rusoto_signature/signature.rs.html#464
+ //
+ // Digging into the code of the official AWS Rust SDK, we learn that double-URI-encoding can
+ // be set or unset on a per-request basis (the signature crates, aws-sigv4 and aws-sig-auth,
+ // are agnostic to this). Grepping the codebase confirms that S3 is the only API for which
+ // double_uri_encode is set to false, meaning it is true (its default value) for all other
+ // AWS services. We will therefore implement this behavior in Garage as well.
+ //
+ // Note that this documentation page, which is touted as the "authoritative reference" on
+ // AWSv4 signatures, makes no mention of either single- or double-urlencoding:
+ // -> https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
+ // This page of the S3 documentation does also not mention anything specific:
+ // -> https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
+ //
+ // Note that there is also the issue of path normalization, which I hope is unrelated to the
+ // one of URI-encoding. At least in aws-sigv4 both parameters can be set independently,
+ // and rusoto_signature does not seem to do any effective path normalization, even though
+ // it mentions it in the comments (same link to the souce code as above).
+ // We make the explicit choice of NOT normalizing paths in the K2V API because doing so
+ // would make non-normalized paths invalid K2V partition keys, and we don't want that.
+ let path: std::borrow::Cow<str> = if service != "s3" {
+ uri_encode(uri.path(), false).into()
+ } else {
+ uri.path().into()
+ };
[
method.as_str(),
- uri.path(),
+ &path,
&canonical_query_string(uri),
&canonical_header_string(headers, signed_headers),
"",
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index c6985754..df16959b 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_table = { version = "0.8.2", path = "../table" }
+garage_db.workspace = true
+garage_rpc.workspace = true
+garage_util.workspace = true
+garage_table.workspace = true
opentelemetry = "0.17"
@@ -28,7 +28,7 @@ hex = "0.4"
tracing = "0.1"
rand = "0.8"
-async-compression = { version = "0.3", features = ["tokio", "zstd"] }
+async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
@@ -37,7 +37,7 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-tokio-util = { version = "0.6", features = ["io"] }
+tokio-util = { version = "0.7", features = ["io"] }
[features]
system-libs = [ "zstd/pkg-config" ]
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 26278974..3ece9a8a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -600,12 +600,32 @@ impl BlockManager {
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
- path.set_extension("zst");
- if fs::metadata(&path).await.is_ok() {
- return Ok(true);
+
+ // If compression is disabled on node - check for the raw block
+ // first and then a compressed one (as compression may have been
+ // previously enabled).
+ match self.compression_level {
+ None => {
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(false);
+ }
+
+ path.set_extension("zst");
+
+ fs::metadata(&path).await.map(|_| true).map_err(Into::into)
+ }
+ _ => {
+ path.set_extension("zst");
+
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(true);
+ }
+
+ path.set_extension("");
+
+ fs::metadata(&path).await.map(|_| false).map_err(Into::into)
+ }
}
- path.set_extension("");
- fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
diff --git a/src/block/repair.rs b/src/block/repair.rs
index c89484d9..71093d69 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -220,14 +220,12 @@ fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
// balance scrub load across different cluster nodes.
- let next_run_timestamp = timestamp
+ timestamp
+ SCRUB_INTERVAL
.saturating_add(Duration::from_secs(
rand::thread_rng().gen_range(0..3600 * 24 * 10),
))
- .as_millis() as u64;
-
- next_run_timestamp
+ .as_millis() as u64
}
impl Default for ScrubWorkerPersisted {
@@ -241,18 +239,14 @@ impl Default for ScrubWorkerPersisted {
}
}
+#[derive(Default)]
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
+ #[default]
Finished,
}
-impl Default for ScrubWorkerState {
- fn default() -> Self {
- ScrubWorkerState::Finished
- }
-}
-
#[derive(Debug)]
pub enum ScrubWorkerCommand {
Start,
diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml
index e3a65857..a00a2eed 100644
--- a/src/db/Cargo.toml
+++ b/src/db/Cargo.toml
@@ -22,19 +22,19 @@ hexdump = "0.1"
tracing = "0.1"
heed = { version = "0.11", default-features = false, features = ["lmdb"], optional = true }
-rusqlite = { version = "0.28", optional = true }
+rusqlite = { version = "0.29", optional = true }
sled = { version = "0.34", optional = true }
# cli deps
clap = { version = "4.1", optional = true, features = ["derive", "env"] }
-pretty_env_logger = { version = "0.4", optional = true }
+pretty_env_logger = { version = "0.5", optional = true }
[dev-dependencies]
mktemp = "0.5"
[features]
default = [ "sled" ]
-bundled-libs = [ "rusqlite/bundled" ]
+bundled-libs = [ "rusqlite?/bundled" ]
cli = ["clap", "pretty_env_logger"]
lmdb = [ "heed" ]
sqlite = [ "rusqlite" ]
diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs
index 31956612..ecbc3b81 100644
--- a/src/db/lmdb_adapter.rs
+++ b/src/db/lmdb_adapter.rs
@@ -349,6 +349,6 @@ pub fn recommended_map_size() -> usize {
#[cfg(target_pointer_width = "32")]
pub fn recommended_map_size() -> usize {
- warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
+ tracing::warn!("LMDB is not recommended on 32-bit systems, database size will be limited");
1usize << 30
}
diff --git a/src/format-table/Cargo.toml b/src/format-table/Cargo.toml
new file mode 100644
index 00000000..9e31e211
--- /dev/null
+++ b/src/format-table/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "format_table"
+version = "0.1.1"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Format tables with a stupid API"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "README.md"
+
+[lib]
+path = "lib.rs"
diff --git a/src/format-table/README.md b/src/format-table/README.md
new file mode 100644
index 00000000..d918bdf4
--- /dev/null
+++ b/src/format-table/README.md
@@ -0,0 +1,13 @@
+# `format_table`
+
+Format tables with a stupid API. [Documentation](https://docs.rs/format_table).
+
+Example:
+
+```rust
+let mut table = vec!["product\tquantity\tprice".to_string()];
+for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
+ table.push(format!("{}\t{}\t{}", p, q, r));
+}
+format_table::format_table(table);
+```
diff --git a/src/util/formater.rs b/src/format-table/lib.rs
index 2ea53ebb..55252ba9 100644
--- a/src/util/formater.rs
+++ b/src/format-table/lib.rs
@@ -1,3 +1,19 @@
+//! Format tables with a stupid API.
+//!
+//! Example:
+//!
+//! ```rust
+//! let mut table = vec!["product\tquantity\tprice".to_string()];
+//! for (p, q, r) in [("tomato", 12, 15), ("potato", 10, 20), ("rice", 5, 12)] {
+//! table.push(format!("{}\t{}\t{}", p, q, r));
+//! }
+//! format_table::format_table(table);
+//! ```
+//!
+//! A table to be formatted is a `Vec<String>`, containing one string per line.
+//! Table columns in each line are separated by a `\t` character.
+
+/// Format a table and return the result as a string.
pub fn format_table_to_string(data: Vec<String>) -> String {
let data = data
.iter()
@@ -27,6 +43,7 @@ pub fn format_table_to_string(data: Vec<String>) -> String {
out
}
+/// Format a table and print the result to stdout.
pub fn format_table(data: Vec<String>) {
print!("{}", format_table_to_string(data));
}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 0cbdf890..a6187729 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -21,19 +21,20 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
-garage_api = { version = "0.8.2", path = "../api" }
-garage_block = { version = "0.8.2", path = "../block" }
-garage_model = { version = "0.8.2", path = "../model" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_table = { version = "0.8.2", path = "../table" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_web = { version = "0.8.2", path = "../web" }
+format_table.workspace = true
+garage_db.workspace = true
+garage_api.workspace = true
+garage_block.workspace = true
+garage_model.workspace = true
+garage_rpc.workspace = true
+garage_table.workspace = true
+garage_util.workspace = true
+garage_web.workspace = true
backtrace = "0.3"
bytes = "1.0"
-bytesize = "1.1"
-timeago = "0.4"
+bytesize = "1.2"
+timeago = { version = "0.4", default-features = false }
parse_duration = "2.1"
hex = "0.4"
tracing = { version = "0.1" }
@@ -41,6 +42,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
+git-version = "0.3.4"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
@@ -59,7 +61,8 @@ opentelemetry-otlp = { version = "0.10", optional = true }
prometheus = { version = "0.13", optional = true }
[dev-dependencies]
-aws-sdk-s3 = "0.19"
+aws-config = "0.55.2"
+aws-sdk-s3 = "0.28"
chrono = "0.4"
http = "0.2"
hmac = "0.12"
@@ -71,6 +74,8 @@ assert-json-diff = "2.0"
serde_json = "1.0"
base64 = "0.21"
+k2v-client.workspace = true
+
[features]
default = [ "bundled-libs", "metrics", "sled", "k2v" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
deleted file mode 100644
index 34141cb2..00000000
--- a/src/garage/admin.rs
+++ /dev/null
@@ -1,1298 +0,0 @@
-use std::collections::HashMap;
-use std::fmt::Write;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
-use garage_util::background::BackgroundRunner;
-use garage_util::crdt::*;
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-use garage_util::formater::format_table_to_string;
-use garage_util::time::*;
-
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_rpc::ring::PARTITION_BITS;
-use garage_rpc::*;
-
-use garage_block::manager::BlockResyncErrorInfo;
-
-use garage_model::bucket_alias_table::*;
-use garage_model::bucket_table::*;
-use garage_model::garage::Garage;
-use garage_model::helper::error::{Error, OkOrBadRequest};
-use garage_model::key_table::*;
-use garage_model::migrate::Migrate;
-use garage_model::permission::*;
-use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::Version;
-
-use crate::cli::*;
-use crate::repair::online::launch_online_repair;
-
-pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
-
-#[derive(Debug, Serialize, Deserialize)]
-#[allow(clippy::large_enum_variant)]
-pub enum AdminRpc {
- BucketOperation(BucketOperation),
- KeyOperation(KeyOperation),
- LaunchRepair(RepairOpt),
- Migrate(MigrateOpt),
- Stats(StatsOpt),
- Worker(WorkerOperation),
- BlockOperation(BlockOperation),
-
- // Replies
- Ok(String),
- BucketList(Vec<Bucket>),
- BucketInfo {
- bucket: Bucket,
- relevant_keys: HashMap<String, Key>,
- counters: HashMap<String, i64>,
- },
- KeyList(Vec<(String, String)>),
- KeyInfo(Key, HashMap<Uuid, Bucket>),
- WorkerList(
- HashMap<usize, garage_util::background::WorkerInfo>,
- WorkerListOpt,
- ),
- WorkerVars(Vec<(Uuid, String, String)>),
- WorkerInfo(usize, garage_util::background::WorkerInfo),
- BlockErrorList(Vec<BlockResyncErrorInfo>),
- BlockInfo {
- hash: Hash,
- refcount: u64,
- versions: Vec<Result<Version, Uuid>>,
- },
-}
-
-impl Rpc for AdminRpc {
- type Response = Result<AdminRpc, Error>;
-}
-
-pub struct AdminRpcHandler {
- garage: Arc<Garage>,
- background: Arc<BackgroundRunner>,
- endpoint: Arc<Endpoint<AdminRpc, Self>>,
-}
-
-impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
- let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self {
- garage,
- background,
- endpoint,
- });
- admin.endpoint.set_handler(admin.clone());
- admin
- }
-
- // ================ BUCKET COMMANDS ====================
-
- async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BucketOperation::List => self.handle_list_buckets().await,
- BucketOperation::Info(query) => self.handle_bucket_info(query).await,
- BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
- BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
- BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
- BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
- BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
- BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
- BucketOperation::Website(query) => self.handle_bucket_website(query).await,
- BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
- BucketOperation::CleanupIncompleteUploads(query) => {
- self.handle_bucket_cleanup_incomplete_uploads(query).await
- }
- }
- }
-
- async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
- let buckets = self
- .garage
- .bucket_table
- .get_range(
- &EmptyKey,
- None,
- Some(DeletedFilter::NotDeleted),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?;
-
- Ok(AdminRpc::BucketList(buckets))
- }
-
- async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
-
- let counters = self
- .garage
- .object_counter_table
- .table
- .get(&bucket_id, &EmptyKey)
- .await?
- .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
- .unwrap_or_default();
-
- let mut relevant_keys = HashMap::new();
- for (k, _) in bucket
- .state
- .as_option()
- .unwrap()
- .authorized_keys
- .items()
- .iter()
- {
- if let Some(key) = self
- .garage
- .key_table
- .get(&EmptyKey, k)
- .await?
- .filter(|k| !k.is_deleted())
- {
- relevant_keys.insert(k.clone(), key);
- }
- }
- for ((k, _), _, _) in bucket
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .items()
- .iter()
- {
- if relevant_keys.contains_key(k) {
- continue;
- }
- if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
- relevant_keys.insert(k.clone(), key);
- }
- }
-
- Ok(AdminRpc::BucketInfo {
- bucket,
- relevant_keys,
- counters,
- })
- }
-
- #[allow(clippy::ptr_arg)]
- async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
- if !is_valid_bucket_name(name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- name, INVALID_BUCKET_NAME_MESSAGE
- )));
- }
-
- if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
- if alias.state.get().is_some() {
- return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
- }
- }
-
- // ---- done checking, now commit ----
-
- let bucket = Bucket::new();
- self.garage.bucket_table.insert(&bucket).await?;
-
- self.garage
- .bucket_helper()
- .set_global_bucket_alias(bucket.id, name)
- .await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
- }
-
- async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- // Get the alias, but keep in minde here the bucket name
- // given in parameter can also be directly the bucket's ID.
- // In that case bucket_alias will be None, and
- // we can still delete the bucket if it has zero aliases
- // (a condition which we try to prevent but that could still happen somehow).
- // We just won't try to delete an alias entry because there isn't one.
- let bucket_alias = self
- .garage
- .bucket_alias_table
- .get(&EmptyKey, &query.name)
- .await?;
-
- // Check bucket doesn't have other aliases
- let mut bucket = helper.get_existing_bucket(bucket_id).await?;
- let bucket_state = bucket.state.as_option().unwrap();
- if bucket_state
- .aliases
- .items()
- .iter()
- .filter(|(_, _, active)| *active)
- .any(|(name, _, _)| name != &query.name)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
- if bucket_state
- .local_aliases
- .items()
- .iter()
- .any(|(_, _, active)| *active)
- {
- return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
- }
-
- // Check bucket is empty
- if !helper.is_bucket_empty(bucket_id).await? {
- return Err(Error::BadRequest(format!(
- "Bucket {} is not empty",
- query.name
- )));
- }
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- // --- done checking, now commit ---
- // 1. delete authorization from keys that had access
- for (key_id, _) in bucket.authorized_keys() {
- helper
- .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
- .await?;
- }
-
- // 2. delete bucket alias
- if bucket_alias.is_some() {
- helper
- .purge_global_bucket_alias(bucket_id, &query.name)
- .await?;
- }
-
- // 3. delete bucket
- bucket.state = Deletable::delete();
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
- }
-
- async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.existing_bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- helper
- .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?} in namespace of key {}",
- query.new_name, bucket_id, key.key_id
- )))
- } else {
- helper
- .set_global_bucket_alias(bucket_id, &query.new_name)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "Alias {} now points to bucket {:?}",
- query.new_name, bucket_id
- )))
- }
- }
-
- async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- if let Some(key_pattern) = &query.local {
- let key = key_helper.get_existing_matching_key(key_pattern).await?;
-
- let bucket_id = key
- .state
- .as_option()
- .unwrap()
- .local_aliases
- .get(&query.name)
- .cloned()
- .flatten()
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?} in namespace of key {}",
- &query.name, bucket_id, key.key_id
- )))
- } else {
- let bucket_id = helper
- .resolve_global_bucket_name(&query.name)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- helper
- .unset_global_bucket_alias(bucket_id, &query.name)
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "Alias {} no longer points to bucket {:?}",
- &query.name, bucket_id
- )))
- }
- }
-
- async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = query.read || key.allow_read(&bucket_id);
- let allow_write = query.write || key.allow_write(&bucket_id);
- let allow_owner = query.owner || key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
- let helper = self.garage.bucket_helper();
- let key_helper = self.garage.key_helper();
-
- let bucket_id = helper
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
- let key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- let allow_read = !query.read && key.allow_read(&bucket_id);
- let allow_write = !query.write && key.allow_write(&bucket_id);
- let allow_owner = !query.owner && key.allow_owner(&bucket_id);
-
- helper
- .set_bucket_key_permissions(
- bucket_id,
- &key.key_id,
- BucketKeyPerm {
- timestamp: now_msec(),
- allow_read,
- allow_write,
- allow_owner,
- },
- )
- .await?;
-
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}, owner {}.",
- &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
- )))
- }
-
- async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if !(query.allow ^ query.deny) {
- return Err(Error::BadRequest(
- "You must specify exactly one flag, either --allow or --deny".to_string(),
- ));
- }
-
- let website = if query.allow {
- Some(WebsiteConfig {
- index_document: query.index_document.clone(),
- error_document: query.error_document.clone(),
- })
- } else {
- None
- };
-
- bucket_state.website_config.update(website);
- self.garage.bucket_table.insert(&bucket).await?;
-
- let msg = if query.allow {
- format!("Website access allowed for {}", &query.bucket)
- } else {
- format!("Website access denied for {}", &query.bucket)
- };
-
- Ok(AdminRpc::Ok(msg))
- }
-
- async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
- let bucket_id = self
- .garage
- .bucket_helper()
- .resolve_global_bucket_name(&query.bucket)
- .await?
- .ok_or_bad_request("Bucket not found")?;
-
- let mut bucket = self
- .garage
- .bucket_helper()
- .get_existing_bucket(bucket_id)
- .await?;
- let bucket_state = bucket.state.as_option_mut().unwrap();
-
- if query.max_size.is_none() && query.max_objects.is_none() {
- return Err(Error::BadRequest(
- "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
- ));
- }
-
- let mut quotas = bucket_state.quotas.get().clone();
-
- match query.max_size.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_size = None,
- Some(v) => {
- let bs = v
- .parse::<bytesize::ByteSize>()
- .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
- quotas.max_size = Some(bs.as_u64());
- }
- _ => (),
- }
-
- match query.max_objects.as_ref().map(String::as_ref) {
- Some("none") => quotas.max_objects = None,
- Some(v) => {
- let mo = v
- .parse::<u64>()
- .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
- quotas.max_objects = Some(mo);
- }
- _ => (),
- }
-
- bucket_state.quotas.update(quotas);
- self.garage.bucket_table.insert(&bucket).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Quotas updated for {}",
- &query.bucket
- )))
- }
-
- async fn handle_bucket_cleanup_incomplete_uploads(
- &self,
- query: &CleanupIncompleteUploadsOpt,
- ) -> Result<AdminRpc, Error> {
- let mut bucket_ids = vec![];
- for b in query.buckets.iter() {
- bucket_ids.push(
- self.garage
- .bucket_helper()
- .resolve_global_bucket_name(b)
- .await?
- .ok_or_bad_request(format!("Bucket not found: {}", b))?,
- );
- }
-
- let duration = parse_duration::parse::parse(&query.older_than)
- .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
-
- let mut ret = String::new();
- for bucket in bucket_ids {
- let count = self
- .garage
- .bucket_helper()
- .cleanup_incomplete_uploads(&bucket, duration)
- .await?;
- writeln!(
- &mut ret,
- "Bucket {:?}: {} incomplete uploads aborted",
- bucket, count
- )
- .unwrap();
- }
-
- Ok(AdminRpc::Ok(ret))
- }
-
- // ================ KEY COMMANDS ====================
-
- async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
- match cmd {
- KeyOperation::List => self.handle_list_keys().await,
- KeyOperation::Info(query) => self.handle_key_info(query).await,
- KeyOperation::New(query) => self.handle_create_key(query).await,
- KeyOperation::Rename(query) => self.handle_rename_key(query).await,
- KeyOperation::Delete(query) => self.handle_delete_key(query).await,
- KeyOperation::Allow(query) => self.handle_allow_key(query).await,
- KeyOperation::Deny(query) => self.handle_deny_key(query).await,
- KeyOperation::Import(query) => self.handle_import_key(query).await,
- }
- }
-
- async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
- let key_ids = self
- .garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- EnumerationOrder::Forward,
- )
- .await?
- .iter()
- .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
- .collect::<Vec<_>>();
- Ok(AdminRpc::KeyList(key_ids))
- }
-
- async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
- let key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- self.key_info_result(key).await
- }
-
- async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
- let key = Key::new(&query.name);
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- key.params_mut()
- .unwrap()
- .name
- .update(query.new_name.clone());
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
- let key_helper = self.garage.key_helper();
-
- let mut key = key_helper
- .get_existing_matching_key(&query.key_pattern)
- .await?;
-
- if !query.yes {
- return Err(Error::BadRequest(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
-
- key_helper.delete_key(&mut key).await?;
-
- Ok(AdminRpc::Ok(format!(
- "Key {} was deleted successfully.",
- key.key_id
- )))
- }
-
- async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(true);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
- let mut key = self
- .garage
- .key_helper()
- .get_existing_matching_key(&query.key_pattern)
- .await?;
- if query.create_bucket {
- key.params_mut().unwrap().allow_create_bucket.update(false);
- }
- self.garage.key_table.insert(&key).await?;
- self.key_info_result(key).await
- }
-
- async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
- let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
- if prev_key.is_some() {
- return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
- }
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
- self.garage.key_table.insert(&imported_key).await?;
-
- self.key_info_result(imported_key).await
- }
-
- async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
- let mut relevant_buckets = HashMap::new();
-
- for (id, _) in key
- .state
- .as_option()
- .unwrap()
- .authorized_buckets
- .items()
- .iter()
- {
- if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
- relevant_buckets.insert(*id, b);
- }
- }
-
- Ok(AdminRpc::KeyInfo(key, relevant_buckets))
- }
-
- // ================ MIGRATION COMMANDS ====================
-
- async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate migration operation.".to_string(),
- ));
- }
-
- let m = Migrate {
- garage: self.garage.clone(),
- };
- match opt.what {
- MigrateWhat::Buckets050 => m.migrate_buckets050().await,
- }?;
- Ok(AdminRpc::Ok("Migration successfull.".into()))
- }
-
- // ================ REPAIR COMMANDS ====================
-
- async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate repair operations.".to_string(),
- ));
- }
- if opt.all_nodes {
- let mut opt_to_send = opt.clone();
- opt_to_send.all_nodes = false;
-
- let mut failures = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- let resp = self
- .endpoint
- .call(
- &node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- PRIO_NORMAL,
- )
- .await;
- if !matches!(resp, Ok(Ok(_))) {
- failures.push(node);
- }
- }
- if failures.is_empty() {
- Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
- } else {
- Err(Error::BadRequest(format!(
- "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
- failures
- )))
- }
- } else {
- launch_online_repair(&self.garage, &self.background, opt).await?;
- Ok(AdminRpc::Ok(format!(
- "Repair launched on {:?}",
- self.garage.system.id
- )))
- }
- }
-
- // ================ STATS COMMANDS ====================
-
- async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
- if opt.all_nodes {
- let mut ret = String::new();
- let ring = self.garage.system.ring.borrow().clone();
-
- for node in ring.layout.node_ids().iter() {
- let mut opt = opt.clone();
- opt.all_nodes = false;
- opt.skip_global = true;
-
- writeln!(&mut ret, "\n======================").unwrap();
- writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
-
- let node_id = (*node).into();
- match self
- .endpoint
- .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await
- {
- Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
- Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
- }
- }
-
- writeln!(&mut ret, "\n======================").unwrap();
- write!(
- &mut ret,
- "Cluster statistics:\n\n{}",
- self.gather_cluster_stats()
- )
- .unwrap();
-
- Ok(AdminRpc::Ok(ret))
- } else {
- Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
- }
- }
-
- fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
- let mut ret = String::new();
- writeln!(
- &mut ret,
- "\nGarage version: {} [features: {}]\nRust compiler version: {}",
- garage_util::version::garage_version(),
- garage_util::version::garage_features()
- .map(|list| list.join(", "))
- .unwrap_or_else(|| "(unknown)".into()),
- garage_util::version::rust_version(),
- )
- .unwrap();
-
- writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
-
- // Gather table statistics
- let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
- table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
- table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
- write!(
- &mut ret,
- "\nTable stats:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- // Gather block manager statistics
- writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- let rc_len = if opt.detailed {
- self.garage.block_manager.rc_len()?.to_string()
- } else {
- self.garage
- .block_manager
- .rc_fast_len()?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into())
- };
-
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- rc_len
- )
- .unwrap();
- writeln!(
- &mut ret,
- " resync queue length: {}",
- self.garage.block_manager.resync.queue_len()?
- )
- .unwrap();
- writeln!(
- &mut ret,
- " blocks with resync errors: {}",
- self.garage.block_manager.resync.errors_len()?
- )
- .unwrap();
-
- if !opt.detailed {
- writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
- }
-
- if !opt.skip_global {
- write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
- }
-
- Ok(ret)
- }
-
- fn gather_cluster_stats(&self) -> String {
- let mut ret = String::new();
-
- // Gather storage node and free space statistics
- let layout = &self.garage.system.ring.borrow().layout;
- let mut node_partition_count = HashMap::<Uuid, u64>::new();
- for short_id in layout.ring_assignation_data.iter() {
- let id = layout.node_id_vec[*short_id as usize];
- *node_partition_count.entry(id).or_default() += 1;
- }
- let node_info = self
- .garage
- .system
- .get_known_nodes()
- .into_iter()
- .map(|n| (n.id, n))
- .collect::<HashMap<_, _>>();
-
- let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
- for (id, parts) in node_partition_count.iter() {
- let info = node_info.get(id);
- let status = info.map(|x| &x.status);
- let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
- let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
- let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
- let capacity = role
- .map(|x| x.capacity_string())
- .unwrap_or_else(|| "?".into());
- let avail_str = |x| match x {
- Some((avail, total)) => {
- let pct = (avail as f64) / (total as f64) * 100.;
- let avail = bytesize::ByteSize::b(avail);
- let total = bytesize::ByteSize::b(total);
- format!("{}/{} ({:.1}%)", avail, total, pct)
- }
- None => "?".into(),
- };
- let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
- let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
- table.push(format!(
- " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
- id, hostname, zone, capacity, parts, data_avail, meta_avail
- ));
- }
- write!(
- &mut ret,
- "Storage nodes:\n{}",
- format_table_to_string(table)
- )
- .unwrap();
-
- let meta_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.meta_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- let data_part_avail = node_partition_count
- .iter()
- .filter_map(|(id, parts)| {
- node_info
- .get(id)
- .and_then(|x| x.status.data_disk_avail)
- .map(|c| c.0 / *parts)
- })
- .collect::<Vec<_>>();
- if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
- let meta_avail =
- bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- let data_avail =
- bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
- writeln!(
- &mut ret,
- "\nEstimated available storage space cluster-wide (might be lower in practice):"
- )
- .unwrap();
- if meta_part_avail.len() < node_partition_count.len()
- || data_part_avail.len() < node_partition_count.len()
- {
- writeln!(&mut ret, " data: < {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
- writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
- } else {
- writeln!(&mut ret, " data: {}", data_avail).unwrap();
- writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
- }
- }
-
- ret
- }
-
- fn gather_table_stats<F, R>(
- &self,
- t: &Arc<Table<F, R>>,
- detailed: bool,
- ) -> Result<String, Error>
- where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
- {
- let (data_len, mkl_len) = if detailed {
- (
- t.data.store.len().map_err(GarageError::from)?.to_string(),
- t.merkle_updater.merkle_tree_len()?.to_string(),
- )
- } else {
- (
- t.data
- .store
- .fast_len()
- .map_err(GarageError::from)?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into()),
- t.merkle_updater
- .merkle_tree_fast_len()?
- .map(|x| x.to_string())
- .unwrap_or_else(|| "NC".into()),
- )
- };
-
- Ok(format!(
- " {}\t{}\t{}\t{}\t{}",
- F::TABLE_NAME,
- data_len,
- mkl_len,
- t.merkle_updater.todo_len()?,
- t.data.gc_todo_len()?
- ))
- }
-
- // ================ WORKER COMMANDS ====================
-
- async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
- match cmd {
- WorkerOperation::List { opt } => {
- let workers = self.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, *opt))
- }
- WorkerOperation::Info { tid } => {
- let info = self
- .background
- .get_worker_info()
- .get(tid)
- .ok_or_bad_request(format!("No worker with TID {}", tid))?
- .clone();
- Ok(AdminRpc::WorkerInfo(*tid, info))
- }
- WorkerOperation::Get {
- all_nodes,
- variable,
- } => self.handle_get_var(*all_nodes, variable).await,
- WorkerOperation::Set {
- all_nodes,
- variable,
- value,
- } => self.handle_set_var(*all_nodes, variable, value).await,
- }
- }
-
- async fn handle_get_var(
- &self,
- all_nodes: bool,
- variable: &Option<String>,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Get {
- all_nodes: false,
- variable: variable.clone(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- #[allow(clippy::collapsible_else_if)]
- if let Some(v) = variable {
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- v.clone(),
- self.garage.bg_vars.get(v)?,
- )]))
- } else {
- let mut vars = self.garage.bg_vars.get_all();
- vars.sort();
- Ok(AdminRpc::WorkerVars(
- vars.into_iter()
- .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
- .collect(),
- ))
- }
- }
- }
-
- async fn handle_set_var(
- &self,
- all_nodes: bool,
- variable: &str,
- value: &str,
- ) -> Result<AdminRpc, Error> {
- if all_nodes {
- let mut ret = vec![];
- let ring = self.garage.system.ring.borrow().clone();
- for node in ring.layout.node_ids().iter() {
- let node = (*node).into();
- match self
- .endpoint
- .call(
- &node,
- AdminRpc::Worker(WorkerOperation::Set {
- all_nodes: false,
- variable: variable.to_string(),
- value: value.to_string(),
- }),
- PRIO_NORMAL,
- )
- .await??
- {
- AdminRpc::WorkerVars(v) => ret.extend(v),
- m => return Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
- Ok(AdminRpc::WorkerVars(ret))
- } else {
- self.garage.bg_vars.set(variable, value)?;
- Ok(AdminRpc::WorkerVars(vec![(
- self.garage.system.id,
- variable.to_string(),
- value.to_string(),
- )]))
- }
- }
-
- // ================ BLOCK COMMANDS ====================
-
- async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
- match cmd {
- BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
- self.garage.block_manager.list_resync_errors()?,
- )),
- BlockOperation::Info { hash } => {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let refcount = self.garage.block_manager.get_block_rc(&hash)?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
- let mut versions = vec![];
- for br in block_refs {
- if let Some(v) = self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- versions.push(Ok(v));
- } else {
- versions.push(Err(br.version));
- }
- }
- Ok(AdminRpc::BlockInfo {
- hash,
- refcount,
- versions,
- })
- }
- BlockOperation::RetryNow { all, blocks } => {
- if *all {
- if !blocks.is_empty() {
- return Err(Error::BadRequest(
- "--all was specified, cannot also specify blocks".into(),
- ));
- }
- let blocks = self.garage.block_manager.list_resync_errors()?;
- for b in blocks.iter() {
- self.garage.block_manager.resync.clear_backoff(&b.hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- } else {
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- self.garage.block_manager.resync.clear_backoff(&hash)?;
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks returned in queue for a retry now (check logs to see results)",
- blocks.len()
- )))
- }
- }
- BlockOperation::Purge { yes, blocks } => {
- if !yes {
- return Err(Error::BadRequest(
- "Pass the --yes flag to confirm block purge operation.".into(),
- ));
- }
-
- let mut obj_dels = 0;
- let mut ver_dels = 0;
-
- for hash in blocks {
- let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
- let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
- let block_refs = self
- .garage
- .block_ref_table
- .get_range(&hash, None, None, 10000, Default::default())
- .await?;
-
- for br in block_refs {
- let version = match self
- .garage
- .version_table
- .get(&br.version, &EmptyKey)
- .await?
- {
- Some(v) => v,
- None => continue,
- };
-
- if let Some(object) = self
- .garage
- .object_table
- .get(&version.bucket_id, &version.key)
- .await?
- {
- let ov = object.versions().iter().rev().find(|v| v.is_complete());
- if let Some(ov) = ov {
- if ov.uuid == br.version {
- let del_uuid = gen_uuid();
- let deleted_object = Object::new(
- version.bucket_id,
- version.key.clone(),
- vec![ObjectVersion {
- uuid: del_uuid,
- timestamp: ov.timestamp + 1,
- state: ObjectVersionState::Complete(
- ObjectVersionData::DeleteMarker,
- ),
- }],
- );
- self.garage.object_table.insert(&deleted_object).await?;
- obj_dels += 1;
- }
- }
- }
-
- if !version.deleted.get() {
- let deleted_version = Version::new(
- version.uuid,
- version.bucket_id,
- version.key.clone(),
- true,
- );
- self.garage.version_table.insert(&deleted_version).await?;
- ver_dels += 1;
- }
- }
- }
- Ok(AdminRpc::Ok(format!(
- "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
- blocks.len(),
- obj_dels,
- ver_dels
- )))
- }
- }
- }
-}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
- self: &Arc<Self>,
- message: &AdminRpc,
- _from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
- AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
- AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
- AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
-}
diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs
new file mode 100644
index 00000000..e9e3ff96
--- /dev/null
+++ b/src/garage/admin/block.rs
@@ -0,0 +1,160 @@
+use garage_util::data::*;
+
+use garage_table::*;
+
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => self.handle_block_info(hash).await,
+ BlockOperation::RetryNow { all, blocks } => {
+ self.handle_block_retry_now(*all, blocks).await
+ }
+ BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
+ }
+ }
+
+ async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+
+ async fn handle_block_retry_now(
+ &self,
+ all: bool,
+ blocks: &[String],
+ ) -> Result<AdminRpc, Error> {
+ if all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+
+ async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version =
+ Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+}
diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs
new file mode 100644
index 00000000..11bb8730
--- /dev/null
+++ b/src/garage/admin/bucket.rs
@@ -0,0 +1,496 @@
+use std::collections::HashMap;
+use std::fmt::Write;
+
+use garage_util::crdt::*;
+use garage_util::time::*;
+
+use garage_table::*;
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::*;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::permission::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => self.handle_bucket_info(query).await,
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
+ BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
+ BucketOperation::CleanupIncompleteUploads(query) => {
+ self.handle_bucket_cleanup_incomplete_uploads(query).await
+ }
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let buckets = self
+ .garage
+ .bucket_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+
+ Ok(AdminRpc::BucketList(buckets))
+ }
+
+ async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
+ let mut relevant_keys = HashMap::new();
+ for (k, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_keys
+ .items()
+ .iter()
+ {
+ if let Some(key) = self
+ .garage
+ .key_table
+ .get(&EmptyKey, k)
+ .await?
+ .filter(|k| !k.is_deleted())
+ {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+ for ((k, _), _, _) in bucket
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .items()
+ .iter()
+ {
+ if relevant_keys.contains_key(k) {
+ continue;
+ }
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
+ relevant_keys.insert(k.clone(), key);
+ }
+ }
+
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
+ }
+
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ if !is_valid_bucket_name(name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ if alias.state.get().is_some() {
+ return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
+ }
+ }
+
+ // ---- done checking, now commit ----
+
+ let bucket = Bucket::new();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ self.garage
+ .bucket_helper()
+ .set_global_bucket_alias(bucket.id, name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
+
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ // Get the alias, but keep in minde here the bucket name
+ // given in parameter can also be directly the bucket's ID.
+ // In that case bucket_alias will be None, and
+ // we can still delete the bucket if it has zero aliases
+ // (a condition which we try to prevent but that could still happen somehow).
+ // We just won't try to delete an alias entry because there isn't one.
+ let bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = helper.get_existing_bucket(bucket_id).await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ if !helper.is_bucket_empty(bucket_id).await? {
+ return Err(Error::BadRequest(format!(
+ "Bucket {} is not empty",
+ query.name
+ )));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ helper
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 2. delete bucket alias
+ if bucket_alias.is_some() {
+ helper
+ .purge_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.existing_bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ helper
+ .set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?} in namespace of key {}",
+ query.new_name, bucket_id, key.key_id
+ )))
+ } else {
+ helper
+ .set_global_bucket_alias(bucket_id, &query.new_name)
+ .await?;
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} now points to bucket {:?}",
+ query.new_name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ if let Some(key_pattern) = &query.local {
+ let key = key_helper.get_existing_matching_key(key_pattern).await?;
+
+ let bucket_id = key
+ .state
+ .as_option()
+ .unwrap()
+ .local_aliases
+ .get(&query.name)
+ .cloned()
+ .flatten()
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?} in namespace of key {}",
+ &query.name, bucket_id, key.key_id
+ )))
+ } else {
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.name)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ helper
+ .unset_global_bucket_alias(bucket_id, &query.name)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Alias {} no longer points to bucket {:?}",
+ &query.name, bucket_id
+ )))
+ }
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+ let allow_owner = query.owner || key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let helper = self.garage.bucket_helper();
+ let key_helper = self.garage.key_helper();
+
+ let bucket_id = helper
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+ let key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+ let allow_owner = !query.owner && key.allow_owner(&bucket_id);
+
+ helper
+ .set_bucket_key_permissions(
+ bucket_id,
+ &key.key_id,
+ BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ allow_owner,
+ },
+ )
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}, owner {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write, allow_owner
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::BadRequest(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ let website = if query.allow {
+ Some(WebsiteConfig {
+ index_document: query.index_document.clone(),
+ error_document: query.error_document.clone(),
+ })
+ } else {
+ None
+ };
+
+ bucket_state.website_config.update(website);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
+ }
+
+ async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_bad_request("Bucket not found")?;
+
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option_mut().unwrap();
+
+ if query.max_size.is_none() && query.max_objects.is_none() {
+ return Err(Error::BadRequest(
+ "You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
+ ));
+ }
+
+ let mut quotas = bucket_state.quotas.get().clone();
+
+ match query.max_size.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_size = None,
+ Some(v) => {
+ let bs = v
+ .parse::<bytesize::ByteSize>()
+ .ok_or_bad_request(format!("Invalid size specified: {}", v))?;
+ quotas.max_size = Some(bs.as_u64());
+ }
+ _ => (),
+ }
+
+ match query.max_objects.as_ref().map(String::as_ref) {
+ Some("none") => quotas.max_objects = None,
+ Some(v) => {
+ let mo = v
+ .parse::<u64>()
+ .ok_or_bad_request(format!("Invalid number specified: {}", v))?;
+ quotas.max_objects = Some(mo);
+ }
+ _ => (),
+ }
+
+ bucket_state.quotas.update(quotas);
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Quotas updated for {}",
+ &query.bucket
+ )))
+ }
+
+ async fn handle_bucket_cleanup_incomplete_uploads(
+ &self,
+ query: &CleanupIncompleteUploadsOpt,
+ ) -> Result<AdminRpc, Error> {
+ let mut bucket_ids = vec![];
+ for b in query.buckets.iter() {
+ bucket_ids.push(
+ self.garage
+ .bucket_helper()
+ .resolve_global_bucket_name(b)
+ .await?
+ .ok_or_bad_request(format!("Bucket not found: {}", b))?,
+ );
+ }
+
+ let duration = parse_duration::parse::parse(&query.older_than)
+ .ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
+
+ let mut ret = String::new();
+ for bucket in bucket_ids {
+ let count = self
+ .garage
+ .bucket_helper()
+ .cleanup_incomplete_uploads(&bucket, duration)
+ .await?;
+ writeln!(
+ &mut ret,
+ "Bucket {:?}: {} incomplete uploads aborted",
+ bucket, count
+ )
+ .unwrap();
+ }
+
+ Ok(AdminRpc::Ok(ret))
+ }
+}
diff --git a/src/garage/admin/key.rs b/src/garage/admin/key.rs
new file mode 100644
index 00000000..cab13bcf
--- /dev/null
+++ b/src/garage/admin/key.rs
@@ -0,0 +1,149 @@
+use std::collections::HashMap;
+
+use garage_table::*;
+
+use garage_model::helper::error::Error;
+use garage_model::key_table::*;
+
+use crate::cli::*;
+
+use super::*;
+
+impl AdminRpcHandler {
+ pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ KeyOperation::List => self.handle_list_keys().await,
+ KeyOperation::Info(query) => self.handle_key_info(query).await,
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Allow(query) => self.handle_allow_key(query).await,
+ KeyOperation::Deny(query) => self.handle_deny_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_key_info(&self, query: &KeyOpt) -> Result<AdminRpc, Error> {
+ let key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(&query.name);
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ key.params_mut()
+ .unwrap()
+ .name
+ .update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let key_helper = self.garage.key_helper();
+
+ let mut key = key_helper
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+
+ if !query.yes {
+ return Err(Error::BadRequest(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ key_helper.delete_key(&mut key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
+ }
+
+ async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(true);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
+ let mut key = self
+ .garage
+ .key_helper()
+ .get_existing_matching_key(&query.key_pattern)
+ .await?;
+ if query.create_bucket {
+ key.params_mut().unwrap().allow_create_bucket.update(false);
+ }
+ self.garage.key_table.insert(&key).await?;
+ self.key_info_result(key).await
+ }
+
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+
+ self.key_info_result(imported_key).await
+ }
+
+ async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
+ let mut relevant_buckets = HashMap::new();
+
+ for (id, _) in key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ {
+ if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
+ relevant_buckets.insert(*id, b);
+ }
+ }
+
+ Ok(AdminRpc::KeyInfo(key, relevant_buckets))
+ }
+}
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
new file mode 100644
index 00000000..2709f08a
--- /dev/null
+++ b/src/garage/admin/mod.rs
@@ -0,0 +1,538 @@
+mod block;
+mod bucket;
+mod key;
+
+use std::collections::HashMap;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use format_table::format_table_to_string;
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error as GarageError;
+
+use garage_table::replication::*;
+use garage_table::*;
+
+use garage_rpc::ring::PARTITION_BITS;
+use garage_rpc::*;
+
+use garage_block::manager::BlockResyncErrorInfo;
+
+use garage_model::bucket_table::*;
+use garage_model::garage::Garage;
+use garage_model::helper::error::{Error, OkOrBadRequest};
+use garage_model::key_table::*;
+use garage_model::migrate::Migrate;
+use garage_model::s3::version_table::Version;
+
+use crate::cli::*;
+use crate::repair::online::launch_online_repair;
+
+pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
+
+#[derive(Debug, Serialize, Deserialize)]
+#[allow(clippy::large_enum_variant)]
+pub enum AdminRpc {
+ BucketOperation(BucketOperation),
+ KeyOperation(KeyOperation),
+ LaunchRepair(RepairOpt),
+ Migrate(MigrateOpt),
+ Stats(StatsOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
+
+ // Replies
+ Ok(String),
+ BucketList(Vec<Bucket>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
+ KeyList(Vec<(String, String)>),
+ KeyInfo(Key, HashMap<Uuid, Bucket>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
+ WorkerVars(Vec<(Uuid, String, String)>),
+ WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
+}
+
+impl Rpc for AdminRpc {
+ type Response = Result<AdminRpc, Error>;
+}
+
+pub struct AdminRpcHandler {
+ garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
+ endpoint: Arc<Endpoint<AdminRpc, Self>>,
+}
+
+impl AdminRpcHandler {
+ pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
+ let admin = Arc::new(Self {
+ garage,
+ background,
+ endpoint,
+ });
+ admin.endpoint.set_handler(admin.clone());
+ admin
+ }
+
+ // ================ MIGRATION COMMANDS ====================
+
+ async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate migration operation.".to_string(),
+ ));
+ }
+
+ let m = Migrate {
+ garage: self.garage.clone(),
+ };
+ match opt.what {
+ MigrateWhat::Buckets050 => m.migrate_buckets050().await,
+ }?;
+ Ok(AdminRpc::Ok("Migration successfull.".into()))
+ }
+
+ // ================ REPAIR COMMANDS ====================
+
+ async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
+ if !opt.yes {
+ return Err(Error::BadRequest(
+ "Please provide the --yes flag to initiate repair operations.".to_string(),
+ ));
+ }
+ if opt.all_nodes {
+ let mut opt_to_send = opt.clone();
+ opt_to_send.all_nodes = false;
+
+ let mut failures = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ let resp = self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
+ PRIO_NORMAL,
+ )
+ .await;
+ if !matches!(resp, Ok(Ok(_))) {
+ failures.push(node);
+ }
+ }
+ if failures.is_empty() {
+ Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
+ } else {
+ Err(Error::BadRequest(format!(
+ "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
+ failures
+ )))
+ }
+ } else {
+ launch_online_repair(&self.garage, &self.background, opt).await?;
+ Ok(AdminRpc::Ok(format!(
+ "Repair launched on {:?}",
+ self.garage.system.id
+ )))
+ }
+ }
+
+ // ================ STATS COMMANDS ====================
+
+ async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
+ if opt.all_nodes {
+ let mut ret = String::new();
+ let ring = self.garage.system.ring.borrow().clone();
+
+ for node in ring.layout.node_ids().iter() {
+ let mut opt = opt.clone();
+ opt.all_nodes = false;
+ opt.skip_global = true;
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
+
+ let node_id = (*node).into();
+ match self
+ .endpoint
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
+ .await
+ {
+ Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
+ Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
+ }
+ }
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ write!(
+ &mut ret,
+ "Cluster statistics:\n\n{}",
+ self.gather_cluster_stats()
+ )
+ .unwrap();
+
+ Ok(AdminRpc::Ok(ret))
+ } else {
+ Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
+ }
+ }
+
+ fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
+ let mut ret = String::new();
+ writeln!(
+ &mut ret,
+ "\nGarage version: {} [features: {}]\nRust compiler version: {}",
+ garage_util::version::garage_version(),
+ garage_util::version::garage_features()
+ .map(|list| list.join(", "))
+ .unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
+ )
+ .unwrap();
+
+ writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
+
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ // Gather block manager statistics
+ writeln!(&mut ret, "\nBlock manager stats:").unwrap();
+ let rc_len = if opt.detailed {
+ self.garage.block_manager.rc_len()?.to_string()
+ } else {
+ self.garage
+ .block_manager
+ .rc_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into())
+ };
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " resync queue length: {}",
+ self.garage.block_manager.resync.queue_len()?
+ )
+ .unwrap();
+ writeln!(
+ &mut ret,
+ " blocks with resync errors: {}",
+ self.garage.block_manager.resync.errors_len()?
+ )
+ .unwrap();
+
+ if !opt.detailed {
+ writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
+ }
+
+ if !opt.skip_global {
+ write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
+ }
+
+ Ok(ret)
+ }
+
+ fn gather_cluster_stats(&self) -> String {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics
+ let layout = &self.garage.system.ring.borrow().layout;
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.ring_assignation_data.iter() {
+ let id = layout.node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = self
+ .garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ ret
+ }
+
+ fn gather_table_stats<F, R>(
+ &self,
+ t: &Arc<Table<F, R>>,
+ detailed: bool,
+ ) -> Result<String, Error>
+ where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+ {
+ let (data_len, mkl_len) = if detailed {
+ (
+ t.data.store.len().map_err(GarageError::from)?.to_string(),
+ t.merkle_updater.merkle_tree_len()?.to_string(),
+ )
+ } else {
+ (
+ t.data
+ .store
+ .fast_len()
+ .map_err(GarageError::from)?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ t.merkle_updater
+ .merkle_tree_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ )
+ };
+
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
+ }
+
+ // ================ WORKER COMMANDS ====================
+
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
+ let workers = self.background.get_worker_info();
+ Ok(AdminRpc::WorkerList(workers, *opt))
+ }
+ WorkerOperation::Info { tid } => {
+ let info = self
+ .background
+ .get_worker_info()
+ .get(tid)
+ .ok_or_bad_request(format!("No worker with TID {}", tid))?
+ .clone();
+ Ok(AdminRpc::WorkerInfo(*tid, info))
+ }
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
+ }
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<AdminRpc> for AdminRpcHandler {
+ async fn handle(
+ self: &Arc<Self>,
+ message: &AdminRpc,
+ _from: NodeID,
+ ) -> Result<AdminRpc, Error> {
+ match message {
+ AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
+ AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
+ AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
+ AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
+ AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
+ m => Err(GarageError::unexpected_rpc_message(m).into()),
+ }
+ }
+}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 0d735885..cb7a898c 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,8 +1,8 @@
use std::collections::HashSet;
use std::time::Duration;
+use format_table::format_table;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 3884bb92..dc5315a1 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,6 +1,6 @@
+use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_rpc::layout::*;
use garage_rpc::system::*;
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 01ae92da..6e585e53 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -216,7 +216,7 @@ pub struct WebsiteOpt {
#[structopt(short = "i", long = "index-document", default_value = "index.html")]
pub index_document: String,
- /// Error document: the optionnal document returned when an error occurs
+ /// Error document: the optional document returned when an error occurs
#[structopt(short = "e", long = "error-document")]
pub error_document: Option<String>,
}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 2c6be2f4..1140cf22 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::time::Duration;
+use format_table::format_table;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::formater::format_table;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
@@ -373,13 +373,18 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
+ let next_try = if e.next_try > now {
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ } else {
+ "asap".to_string()
+ };
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
- tf2.convert(Duration::from_millis(e.next_try - now))
+ next_try
));
}
format_table(table);
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 1ab18bb2..e8aee892 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -108,6 +108,12 @@ async fn main() {
][..];
if let Some(git_version) = option_env!("GIT_VERSION") {
garage_util::version::init_version(git_version);
+ } else {
+ garage_util::version::init_version(git_version::git_version!(
+ prefix = "git:",
+ cargo_prefix = "cargo:",
+ fallback = "unknown"
+ ));
}
garage_util::version::init_features(features);
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index 0dec3cfa..2dda7e6f 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -1,7 +1,6 @@
use crate::common;
use crate::common::ext::CommandExt;
-use aws_sdk_s3::model::BucketLocationConstraint;
-use aws_sdk_s3::output::DeleteBucketOutput;
+use aws_sdk_s3::operation::delete_bucket::DeleteBucketOutput;
#[tokio::test]
async fn test_bucket_all() {
@@ -63,10 +62,7 @@ async fn test_bucket_all() {
.await
.unwrap();
- match r.location_constraint.unwrap() {
- BucketLocationConstraint::Unknown(v) if v.as_str() == "garage-integ-test" => (),
- _ => unreachable!("wrong region"),
- }
+ assert_eq!(r.location_constraint.unwrap().as_str(), "garage-integ-test");
}
{
// (Stub) check GetVersioning
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index e9d4849a..ef4daa5d 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -1,15 +1,16 @@
-use aws_sdk_s3::{Client, Config, Credentials, Endpoint};
+use aws_sdk_s3::config::Credentials;
+use aws_sdk_s3::{Client, Config};
-use super::garage::{Instance, Key};
+use super::garage::Key;
+use crate::common::garage::DEFAULT_PORT;
-pub fn build_client(instance: &Instance, key: &Key) -> Client {
+pub fn build_client(key: &Key) -> Client {
let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test");
- let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
+ .endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
- .endpoint_resolver(endpoint)
.build();
Client::from_conf(config)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 609eda97..4133bb8b 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -209,6 +209,7 @@ impl<'a> RequestBuilder<'a> {
all_headers.extend(self.unsigned_headers.clone());
let canonical_request = signature::payload::canonical_request(
+ self.service,
&self.method,
&Uri::try_from(&uri).unwrap(),
&all_headers,
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index eca3e42b..54efd1ea 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -1,5 +1,7 @@
-use aws_sdk_s3::{Client, Region};
+use aws_sdk_s3::config::Region;
+use aws_sdk_s3::Client;
use ext::*;
+use k2v_client::K2vClient;
#[macro_use]
pub mod macros;
@@ -31,7 +33,7 @@ impl Context {
fn new() -> Self {
let garage = garage::instance();
let key = garage.key(None);
- let client = client::build_client(garage, &key);
+ let client = client::build_client(&key);
let custom_request = CustomRequester::new_s3(garage, &key);
let k2v_request = CustomRequester::new_k2v(garage, &key);
@@ -68,6 +70,19 @@ impl Context {
bucket_name
}
+
+ /// Build a K2vClient for a given bucket
+ pub fn k2v_client(&self, bucket: &str) -> K2vClient {
+ let config = k2v_client::K2vClientConfig {
+ region: REGION.to_string(),
+ endpoint: self.garage.k2v_uri().to_string(),
+ aws_access_key_id: self.key.id.clone(),
+ aws_secret_access_key: self.key.secret.clone(),
+ bucket: bucket.to_string(),
+ user_agent: None,
+ };
+ K2vClient::new(config).expect("Could not create K2V client")
+ }
}
pub fn context() -> Context {
diff --git a/src/garage/tests/k2v_client/mod.rs b/src/garage/tests/k2v_client/mod.rs
new file mode 100644
index 00000000..b252f36b
--- /dev/null
+++ b/src/garage/tests/k2v_client/mod.rs
@@ -0,0 +1 @@
+pub mod simple;
diff --git a/src/garage/tests/k2v_client/simple.rs b/src/garage/tests/k2v_client/simple.rs
new file mode 100644
index 00000000..1a3118ef
--- /dev/null
+++ b/src/garage/tests/k2v_client/simple.rs
@@ -0,0 +1,60 @@
+use std::time::Duration;
+
+use k2v_client::*;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ k2v_client
+ .insert_item("root", "test1", b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item("root", "test1").await.unwrap();
+
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+}
+
+#[tokio::test]
+async fn test_special_chars() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-client-simple-special-chars");
+ let k2v_client = ctx.k2v_client(&bucket);
+
+ let (pk, sk) = ("root@plépp", "≤≤««");
+ k2v_client
+ .insert_item(pk, sk, b"Hello, world!".to_vec(), None)
+ .await
+ .unwrap();
+
+ let res = k2v_client.read_item(pk, sk).await.unwrap();
+ assert_eq!(res.value.len(), 1);
+ assert_eq!(res.value[0], K2vValue::Value(b"Hello, world!".to_vec()));
+
+ // sleep a bit before read_index
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ let res = k2v_client.read_index(Default::default()).await.unwrap();
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), pk);
+
+ let res = k2v_client
+ .read_batch(&[BatchReadOp {
+ partition_key: pk,
+ filter: Default::default(),
+ single_item: false,
+ conflicts_only: false,
+ tombstones: false,
+ }])
+ .await
+ .unwrap();
+ assert_eq!(res.len(), 1);
+ let res = &res[0];
+ assert_eq!(res.items.len(), 1);
+ assert_eq!(res.items.keys().next().unwrap(), sk);
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 87be1327..e450baac 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -8,3 +8,5 @@ mod s3;
#[cfg(feature = "k2v")]
mod k2v;
+#[cfg(feature = "k2v")]
+mod k2v_client;
diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs
index 895a2993..aeff94b4 100644
--- a/src/garage/tests/s3/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
@@ -1,6 +1,6 @@
use crate::common;
-use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
-use aws_sdk_s3::types::ByteStream;
+use aws_sdk_s3::primitives::ByteStream;
+use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
const SZ_5MB: usize = 5 * 1024 * 1024;
const SZ_10MB: usize = 10 * 1024 * 1024;
diff --git a/src/garage/tests/s3/objects.rs b/src/garage/tests/s3/objects.rs
index 65f9e867..27697d45 100644
--- a/src/garage/tests/s3/objects.rs
+++ b/src/garage/tests/s3/objects.rs
@@ -1,6 +1,6 @@
use crate::common;
-use aws_sdk_s3::model::{Delete, ObjectIdentifier};
-use aws_sdk_s3::types::ByteStream;
+use aws_sdk_s3::primitives::ByteStream;
+use aws_sdk_s3::types::{Delete, ObjectIdentifier};
const STD_KEY: &str = "hello world";
const CTRL_KEY: &str = "\x00\x01\x02\x00";
diff --git a/src/garage/tests/s3/simple.rs b/src/garage/tests/s3/simple.rs
index f54ae9ac..41ec44c6 100644
--- a/src/garage/tests/s3/simple.rs
+++ b/src/garage/tests/s3/simple.rs
@@ -2,7 +2,7 @@ use crate::common;
#[tokio::test]
async fn test_simple() {
- use aws_sdk_s3::types::ByteStream;
+ use aws_sdk_s3::primitives::ByteStream;
let ctx = common::context();
let bucket = ctx.create_bucket("test-simple");
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index f61838e4..7c2b0deb 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -4,8 +4,8 @@ use crate::k2v::json_body;
use assert_json_diff::assert_json_eq;
use aws_sdk_s3::{
- model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
- types::ByteStream,
+ primitives::ByteStream,
+ types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
};
use http::{Request, StatusCode};
use hyper::{
@@ -72,7 +72,7 @@ async fn test_website() {
res_body,
json!({
"code": "InvalidRequest",
- "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "message": "Bad request: Domain 'my-website' is not managed by Garage",
"region": "garage-integ-test",
"path": "/check",
})
@@ -91,24 +91,29 @@ async fn test_website() {
BODY.as_ref()
);
- let admin_req = || {
- Request::builder()
- .method("GET")
- .uri(format!(
- "http://127.0.0.1:{0}/check?domain={1}",
- ctx.garage.admin_port,
- BCKT_NAME.to_string()
- ))
- .body(Body::empty())
- .unwrap()
- };
-
- let mut admin_resp = client.request(admin_req()).await.unwrap();
- assert_eq!(admin_resp.status(), StatusCode::OK);
- assert_eq!(
- to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
- format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes()
- );
+ for bname in [
+ BCKT_NAME.to_string(),
+ format!("{BCKT_NAME}.web.garage"),
+ format!("{BCKT_NAME}.s3.garage"),
+ ] {
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port, bname
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let mut admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::OK);
+ assert_eq!(
+ to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
+ format!("Domain '{bname}' is managed by Garage").as_bytes()
+ );
+ }
ctx.garage
.command()
@@ -142,7 +147,7 @@ async fn test_website() {
res_body,
json!({
"code": "InvalidRequest",
- "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "message": "Bad request: Domain 'my-website' is not managed by Garage",
"region": "garage-integ-test",
"path": "/check",
})
@@ -397,7 +402,7 @@ async fn test_website_s3_api() {
}
#[tokio::test]
-async fn test_website_check_website_enabled() {
+async fn test_website_check_domain() {
let ctx = common::context();
let client = Client::new();
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 52c16d89..2ccb9fe5 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.1.1"
+version = "0.0.4"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -10,24 +10,28 @@ readme = "../../README.md"
[dependencies]
base64 = "0.21"
+sha2 = "0.10"
+hex = "0.4"
http = "0.2"
log = "0.4"
-rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
-rusoto_credential = "0.48.0"
-rusoto_signature = "0.48.0"
-hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
-serde = "1.0"
+aws-sigv4 = "0.55"
+percent-encoding = "2.2"
+hyper = { version = "0.14", default-features = false, features = ["client", "http1", "http2"] }
+hyper-rustls = { version = "0.24", features = ["http2"] }
+serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
thiserror = "1.0"
-tokio = "1.24"
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
# cli deps
clap = { version = "4.1", optional = true, features = ["derive", "env"] }
-garage_util = { version = "0.8.2", path = "../util", optional = true }
+format_table = { workspace = true, optional = true }
+tracing = { version = "0.1", optional = true }
+tracing-subscriber = { version = "0.3", optional = true, features = ["env-filter"] }
[features]
-cli = ["clap", "tokio/fs", "tokio/io-std", "garage_util"]
+cli = ["clap", "tokio/fs", "tokio/io-std", "tracing", "tracing-subscriber", "format_table"]
[lib]
path = "lib.rs"
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index cdd63cce..b9461c89 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -2,12 +2,11 @@ use std::collections::BTreeMap;
use std::process::exit;
use std::time::Duration;
-use k2v_client::*;
+use base64::prelude::*;
-use garage_util::formater::format_table;
+use k2v_client::*;
-use rusoto_core::credential::AwsCredentials;
-use rusoto_core::Region;
+use format_table::format_table;
use clap::{Parser, Subcommand};
@@ -155,7 +154,9 @@ impl Value {
if let Some(ref text) = self.text {
Ok(text.as_bytes().to_vec())
} else if let Some(ref b64) = self.b64 {
- base64::decode(b64).map_err(|_| Error::Message("invalid base64 input".into()))
+ BASE64_STANDARD
+ .decode(b64)
+ .map_err(|_| Error::Message("invalid base64 input".into()))
} else if let Some(ref path) = self.file {
use tokio::io::AsyncReadExt;
if path == "-" {
@@ -230,7 +231,7 @@ impl ReadOutputKind {
for val in val.value {
match val {
K2vValue::Value(v) => {
- println!("{}", base64::encode(&v))
+ println!("{}", BASE64_STANDARD.encode(&v))
}
K2vValue::Tombstone => {
println!();
@@ -249,7 +250,7 @@ impl ReadOutputKind {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" utf-8: {}", string);
} else {
- println!(" base64: {}", base64::encode(&v));
+ println!(" base64: {}", BASE64_STANDARD.encode(&v));
}
}
K2vValue::Tombstone => {
@@ -275,7 +276,7 @@ struct BatchOutputKind {
impl BatchOutputKind {
fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
for (key, values) in values {
- println!("key: {}", key);
+ println!("sort_key: {}", key);
let causality: String = values.causality.into();
println!("causality: {}", causality);
for value in values.value {
@@ -284,7 +285,7 @@ impl BatchOutputKind {
if let Ok(string) = std::str::from_utf8(&v) {
println!(" value(utf-8): {}", string);
} else {
- println!(" value(base64): {}", base64::encode(&v));
+ println!(" value(base64): {}", BASE64_STANDARD.encode(&v));
}
}
K2vValue::Tombstone => {
@@ -310,23 +311,19 @@ impl BatchOutputKind {
.collect::<Vec<_>>()
}
- fn display_poll_range_output(
- &self,
- seen_marker: String,
- values: BTreeMap<String, CausalValue>,
- ) -> ! {
+ fn display_poll_range_output(&self, poll_range: PollRangeResult) -> ! {
if self.json {
let json = serde_json::json!({
- "values": self.values_json(values),
- "seen_marker": seen_marker,
+ "values": self.values_json(poll_range.items),
+ "seen_marker": poll_range.seen_marker,
});
let stdout = std::io::stdout();
serde_json::to_writer_pretty(stdout, &json).unwrap();
exit(0)
} else {
- println!("seen marker: {}", seen_marker);
- self.display_human_output(values)
+ println!("seen marker: {}", poll_range.seen_marker);
+ self.display_human_output(poll_range.items)
}
}
@@ -393,16 +390,27 @@ impl Filter {
#[tokio::main]
async fn main() -> Result<(), Error> {
+ if std::env::var("RUST_LOG").is_err() {
+ std::env::set_var("RUST_LOG", "warn")
+ }
+
+ tracing_subscriber::fmt()
+ .with_writer(std::io::stderr)
+ .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
+ .init();
+
let args = Args::parse();
- let region = Region::Custom {
- name: args.region,
+ let config = K2vClientConfig {
endpoint: args.endpoint,
+ region: args.region,
+ aws_access_key_id: args.key_id,
+ aws_secret_access_key: args.secret,
+ bucket: args.bucket,
+ user_agent: None,
};
- let creds = AwsCredentials::new(args.key_id, args.secret, None, None);
-
- let client = K2vClient::new(region, args.bucket, creds, None)?;
+ let client = K2vClient::new(config)?;
match args.command {
Command::Insert {
@@ -489,8 +497,8 @@ async fn main() -> Result<(), Error> {
)
.await?;
match res {
- Some((items, seen_marker)) => {
- output_kind.display_poll_range_output(seen_marker, items);
+ Some(poll_range_output) => {
+ output_kind.display_poll_range_output(poll_range_output);
}
None => {
if output_kind.json {
@@ -520,7 +528,7 @@ async fn main() -> Result<(), Error> {
value
.as_object_mut()
.unwrap()
- .insert("sort_key".to_owned(), k.into());
+ .insert("partition_key".to_owned(), k.into());
value
})
.collect::<Vec<_>>();
@@ -537,7 +545,7 @@ async fn main() -> Result<(), Error> {
}
let mut to_print = Vec::new();
- to_print.push(format!("key:\tentries\tconflicts\tvalues\tbytes"));
+ to_print.push(format!("partition_key\tentries\tconflicts\tvalues\tbytes"));
for (k, v) in res.items {
to_print.push(format!(
"{}\t{}\t{}\t{}\t{}",
diff --git a/src/k2v-client/error.rs b/src/k2v-client/error.rs
index 37c221f2..564ce497 100644
--- a/src/k2v-client/error.rs
+++ b/src/k2v-client/error.rs
@@ -18,12 +18,20 @@ pub enum Error {
NotFound,
#[error("io error: {0}")]
IoError(#[from] std::io::Error),
- #[error("rusoto tls error: {0}")]
- RusotoTls(#[from] rusoto_core::request::TlsError),
- #[error("rusoto http error: {0}")]
- RusotoHttp(#[from] rusoto_core::HttpDispatchError),
+ #[error("http error: {0}")]
+ Http(#[from] http::Error),
+ #[error("hyper error: {0}")]
+ Hyper(#[from] hyper::Error),
+ #[error("invalid header: {0}")]
+ Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")]
Deserialization(#[from] serde_json::Error),
+ #[error("invalid signature parameters: {0}")]
+ SignParameters(#[from] aws_sigv4::signing_params::BuildError),
+ #[error("could not sign request: {0}")]
+ SignRequest(#[from] aws_sigv4::http_request::SigningError),
+ #[error("request timed out")]
+ Timeout,
#[error("{0}")]
Message(Cow<'static, str>),
}
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index ca52d0cf..4aa7a20a 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -1,20 +1,23 @@
use std::collections::BTreeMap;
-use std::time::Duration;
+use std::convert::TryInto;
+use std::time::{Duration, SystemTime};
-use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE};
-use http::status::StatusCode;
-use http::HeaderMap;
+use base64::prelude::*;
use log::{debug, error};
+use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
+
+use http::header::{ACCEPT, CONTENT_TYPE};
+use http::status::StatusCode;
+use http::{HeaderName, HeaderValue, Request};
+use hyper::{body::Bytes, Body};
+use hyper::{client::connect::HttpConnector, Client as HttpClient};
+use hyper_rustls::HttpsConnector;
+
+use aws_sigv4::http_request::{sign, SignableRequest, SigningParams, SigningSettings};
-use rusoto_core::{ByteStream, DispatchSignedRequest, HttpClient};
-use rusoto_credential::AwsCredentials;
-use rusoto_signature::region::Region;
-use rusoto_signature::signature::SignedRequest;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use tokio::io::AsyncReadExt;
-
mod error;
pub use error::Error;
@@ -22,41 +25,57 @@ pub use error::Error;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v";
-const GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
+const AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
+const GARAGE_CAUSALITY_TOKEN: HeaderName = HeaderName::from_static("x-garage-causality-token");
+
+const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+const PATH_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+ .remove(b'/')
+ .remove(b'_')
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'~');
+
+pub struct K2vClientConfig {
+ pub endpoint: String,
+ pub region: String,
+ pub aws_access_key_id: String,
+ pub aws_secret_access_key: String,
+ pub bucket: String,
+ pub user_agent: Option<String>,
+}
/// Client used to query a K2V server.
pub struct K2vClient {
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- client: HttpClient,
+ config: K2vClientConfig,
+ user_agent: HeaderValue,
+ client: HttpClient<HttpsConnector<HttpConnector>>,
}
impl K2vClient {
/// Create a new K2V client.
- pub fn new(
- region: Region,
- bucket: String,
- creds: AwsCredentials,
- user_agent: Option<String>,
- ) -> Result<Self, Error> {
+ pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
- let mut client = HttpClient::from_connector(connector);
- if let Some(ua) = user_agent {
- client.local_agent_prepend(ua);
- } else {
- client.local_agent_prepend(format!("k2v/{}", env!("CARGO_PKG_VERSION")));
- }
+ let client = HttpClient::builder().build(connector);
+ let user_agent: std::borrow::Cow<str> = match &config.user_agent {
+ Some(ua) => ua.into(),
+ None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
+ };
+ let user_agent = HeaderValue::from_str(&user_agent)
+ .map_err(|_| Error::Message("invalid user agent".into()))?;
Ok(K2vClient {
- region,
- bucket,
- creds,
+ config,
client,
+ user_agent,
})
}
@@ -66,15 +85,10 @@ impl K2vClient {
partition_key: &str,
sort_key: &str,
) -> Result<CausalValue, Error> {
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(ACCEPT, "application/octet-stream, application/json");
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
let causality = res
@@ -91,7 +105,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
}),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -115,16 +129,17 @@ impl K2vClient {
) -> Result<Option<CausalValue>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
- let mut req = SignedRequest::new(
- "GET",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
+ let url = self.build_url(
+ Some(partition_key),
+ &[
+ ("sort_key", sort_key),
+ ("causality_token", &causality.0),
+ ("timeout", &timeout.as_secs().to_string()),
+ ],
);
- req.add_param("sort_key", sort_key);
- req.add_param("causality_token", &causality.0);
- req.add_param("timeout", &timeout.as_secs().to_string());
- req.add_header(ACCEPT, "application/octet-stream, application/json");
+ let req = Request::get(url)
+ .header(ACCEPT, "application/octet-stream, application/json")
+ .body(Bytes::new())?;
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
@@ -146,7 +161,7 @@ impl K2vClient {
match res.content_type.as_deref() {
Some("application/octet-stream") => Ok(Some(CausalValue {
causality,
- value: vec![K2vValue::Value(res.body)],
+ value: vec![K2vValue::Value(res.body.to_vec())],
})),
Some("application/json") => {
let value = serde_json::from_slice(&res.body)?;
@@ -167,7 +182,7 @@ impl K2vClient {
filter: Option<PollRangeFilter<'_>>,
seen_marker: Option<&str>,
timeout: Option<Duration>,
- ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
+ ) -> Result<Option<PollRangeResult>, Error> {
let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
let request = PollRangeRequest {
@@ -176,16 +191,10 @@ impl K2vClient {
timeout: timeout.as_secs(),
};
- let mut req = SignedRequest::new(
- "POST",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("poll_range", "");
-
+ let url = self.build_url(Some(partition_key), &[("poll_range", "")]);
let payload = serde_json::to_vec(&request)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(Bytes::from(payload))?;
+
let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
if res.status == StatusCode::NOT_MODIFIED {
@@ -208,7 +217,10 @@ impl K2vClient {
})
.collect::<BTreeMap<_, _>>();
- Ok(Some((items, resp.seen_marker)))
+ Ok(Some(PollRangeResult {
+ items,
+ seen_marker: resp.seen_marker,
+ }))
}
/// Perform an InsertItem request, inserting a value for a single pk+sk.
@@ -219,18 +231,12 @@ impl K2vClient {
value: Vec<u8>,
causality: Option<CausalityToken>,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "PUT",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.set_payload(Some(value));
-
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let mut req = Request::put(url);
if let Some(causality) = causality {
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ req = req.header(GARAGE_CAUSALITY_TOKEN, &causality.0);
}
+ let req = req.body(Bytes::from(value))?;
self.dispatch(req, None).await?;
Ok(())
@@ -243,14 +249,10 @@ impl K2vClient {
sort_key: &str,
causality: CausalityToken,
) -> Result<(), Error> {
- let mut req = SignedRequest::new(
- "DELETE",
- SERVICE,
- &self.region,
- &format!("/{}/{}", self.bucket, partition_key),
- );
- req.add_param("sort_key", sort_key);
- req.add_header(GARAGE_CAUSALITY_TOKEN, &causality.0);
+ let url = self.build_url(Some(partition_key), &[("sort_key", sort_key)]);
+ let req = Request::delete(url)
+ .header(GARAGE_CAUSALITY_TOKEN, &causality.0)
+ .body(Bytes::new())?;
self.dispatch(req, None).await?;
Ok(())
@@ -262,9 +264,9 @@ impl K2vClient {
&self,
filter: Filter<'_>,
) -> Result<PaginatedRange<PartitionInfo>, Error> {
- let mut req =
- SignedRequest::new("GET", SERVICE, &self.region, &format!("/{}", self.bucket));
- filter.insert_params(&mut req);
+ let params = filter.query_params();
+ let url = self.build_url(None, &params);
+ let req = Request::get(url).body(Bytes::new())?;
let res = self.dispatch(req, None).await?;
@@ -286,11 +288,10 @@ impl K2vClient {
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In
/// that case, failure is reported.
pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
-
+ let url = self.build_url::<&str>(None, &[]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
self.dispatch(req, None).await?;
Ok(())
}
@@ -300,12 +301,10 @@ impl K2vClient {
&self,
operations: &[BatchReadOp<'_>],
) -> Result<Vec<PaginatedRange<CausalValue>>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("search", "");
-
+ let url = self.build_url(None, &[("search", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchReadResponse> = serde_json::from_slice(&res.body)?;
@@ -334,12 +333,10 @@ impl K2vClient {
/// Perform a DeleteBatch request, deleting mutiple values or range of values at once, without
/// providing causality information.
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
- let mut req =
- SignedRequest::new("POST", SERVICE, &self.region, &format!("/{}", self.bucket));
- req.add_param("delete", "");
-
+ let url = self.build_url(None, &[("delete", "")]);
let payload = serde_json::to_vec(operations)?;
- req.set_payload(Some(payload));
+ let req = Request::post(url).body(payload.into())?;
+
let res = self.dispatch(req, None).await?;
let resp: Vec<BatchDeleteResponse> = serde_json::from_slice(&res.body)?;
@@ -349,33 +346,67 @@ impl K2vClient {
async fn dispatch(
&self,
- mut req: SignedRequest,
+ mut req: Request<Bytes>,
timeout: Option<Duration>,
) -> Result<Response, Error> {
- req.sign(&self.creds);
- let mut res = self
- .client
- .dispatch(req, Some(timeout.unwrap_or(DEFAULT_TIMEOUT)))
- .await?;
-
- let causality_token = res
- .headers
- .remove(GARAGE_CAUSALITY_TOKEN)
- .map(CausalityToken);
- let content_type = res.headers.remove(CONTENT_TYPE);
+ req.headers_mut()
+ .insert(http::header::USER_AGENT, self.user_agent.clone());
+
+ use sha2::{Digest, Sha256};
+ let mut hasher = Sha256::new();
+ hasher.update(req.body());
+ let hash = hex::encode(&hasher.finalize());
+ req.headers_mut()
+ .insert(AMZ_CONTENT_SHA256, hash.try_into().unwrap());
+
+ debug!("request uri: {:?}", req.uri());
+
+ // Sign request
+ let signing_settings = SigningSettings::default();
+ let signing_params = SigningParams::builder()
+ .access_key(&self.config.aws_access_key_id)
+ .secret_key(&self.config.aws_secret_access_key)
+ .region(&self.config.region)
+ .service_name(SERVICE)
+ .time(SystemTime::now())
+ .settings(signing_settings)
+ .build()?;
+ // Convert the HTTP request into a signable request
+ let signable_request = SignableRequest::from(&req);
+
+ // Sign and then apply the signature to the request
+ let (signing_instructions, _signature) =
+ sign(signable_request, &signing_params)?.into_parts();
+ signing_instructions.apply_to_request(&mut req);
+
+ // Send and wait for timeout
+ let res = tokio::select! {
+ res = self.client.request(req.map(Body::from)) => res?,
+ _ = tokio::time::sleep(timeout.unwrap_or(DEFAULT_TIMEOUT)) => {
+ return Err(Error::Timeout);
+ }
+ };
+
+ let (mut res, body) = res.into_parts();
+ let causality_token = match res.headers.remove(GARAGE_CAUSALITY_TOKEN) {
+ Some(v) => Some(CausalityToken(v.to_str()?.to_string())),
+ None => None,
+ };
+ let content_type = match res.headers.remove(CONTENT_TYPE) {
+ Some(v) => Some(v.to_str()?.to_string()),
+ None => None,
+ };
let body = match res.status {
- StatusCode::OK => read_body(&mut res.headers, res.body).await?,
- StatusCode::NO_CONTENT => Vec::new(),
+ StatusCode::OK => hyper::body::to_bytes(body).await?,
+ StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound),
- StatusCode::NOT_MODIFIED => Vec::new(),
+ StatusCode::NOT_MODIFIED => Bytes::new(),
s => {
- let err_body = read_body(&mut res.headers, res.body)
- .await
- .unwrap_or_default();
+ let err_body = hyper::body::to_bytes(body).await.unwrap_or_default();
let err_body_str = std::str::from_utf8(&err_body)
.map(String::from)
- .unwrap_or_else(|_| base64::encode(&err_body));
+ .unwrap_or_else(|_| BASE64_STANDARD.encode(&err_body));
if s.is_client_error() || s.is_server_error() {
error!("Error response {}: {}", res.status, err_body_str);
@@ -408,7 +439,7 @@ impl K2vClient {
"Response body: {}",
std::str::from_utf8(&body)
.map(String::from)
- .unwrap_or_else(|_| base64::encode(&body))
+ .unwrap_or_else(|_| BASE64_STANDARD.encode(&body))
);
Ok(Response {
@@ -418,16 +449,26 @@ impl K2vClient {
content_type,
})
}
-}
-async fn read_body(headers: &mut HeaderMap<String>, body: ByteStream) -> Result<Vec<u8>, Error> {
- let body_len = headers
- .get(CONTENT_LENGTH)
- .and_then(|h| h.parse().ok())
- .unwrap_or(0);
- let mut res = Vec::with_capacity(body_len);
- body.into_async_read().read_to_end(&mut res).await?;
- Ok(res)
+ fn build_url<V: AsRef<str>>(&self, partition_key: Option<&str>, query: &[(&str, V)]) -> String {
+ let mut url = format!("{}/{}", self.config.endpoint, self.config.bucket);
+ if let Some(pk) = partition_key {
+ url.push('/');
+ url.extend(utf8_percent_encode(pk, &PATH_ENCODE_SET));
+ }
+ if !query.is_empty() {
+ url.push('?');
+ for (i, (k, v)) in query.iter().enumerate() {
+ if i > 0 {
+ url.push('&');
+ }
+ url.extend(utf8_percent_encode(k, &STRICT_ENCODE_SET));
+ url.push('=');
+ url.extend(utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET));
+ }
+ }
+ url
+ }
}
/// An opaque token used to convey causality between operations.
@@ -482,9 +523,11 @@ impl<'de> Deserialize<'de> for K2vValue {
{
let val: Option<&str> = Option::deserialize(d)?;
Ok(match val {
- Some(s) => {
- K2vValue::Value(base64::decode(s).map_err(|_| DeError::custom("invalid base64"))?)
- }
+ Some(s) => K2vValue::Value(
+ BASE64_STANDARD
+ .decode(s)
+ .map_err(|_| DeError::custom("invalid base64"))?,
+ ),
None => K2vValue::Tombstone,
})
}
@@ -498,7 +541,7 @@ impl Serialize for K2vValue {
match self {
K2vValue::Tombstone => serializer.serialize_none(),
K2vValue::Value(v) => {
- let b64 = base64::encode(v);
+ let b64 = BASE64_STANDARD.encode(v);
serializer.serialize_str(&b64)
}
}
@@ -530,6 +573,7 @@ pub struct Filter<'a> {
pub reverse: bool,
}
+/// Filter for a poll range operations.
#[derive(Debug, Default, Clone, Serialize)]
pub struct PollRangeFilter<'a> {
pub start: Option<&'a str>,
@@ -537,6 +581,15 @@ pub struct PollRangeFilter<'a> {
pub prefix: Option<&'a str>,
}
+/// Response to a poll_range query
+#[derive(Debug, Default, Clone, Serialize)]
+pub struct PollRangeResult {
+ /// List of items that have changed since last PollRange call.
+ pub items: BTreeMap<String, CausalValue>,
+ /// opaque string representing items already seen for future PollRange calls.
+ pub seen_marker: String,
+}
+
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct PollRangeRequest<'a> {
@@ -554,22 +607,24 @@ struct PollRangeResponse {
}
impl<'a> Filter<'a> {
- fn insert_params(&self, req: &mut SignedRequest) {
- if let Some(start) = &self.start {
- req.add_param("start", start);
+ fn query_params(&self) -> Vec<(&'static str, std::borrow::Cow<str>)> {
+ let mut res = Vec::<(&'static str, std::borrow::Cow<str>)>::with_capacity(8);
+ if let Some(start) = self.start.as_deref() {
+ res.push(("start", start.into()));
}
- if let Some(end) = &self.end {
- req.add_param("end", end);
+ if let Some(end) = self.end.as_deref() {
+ res.push(("end", end.into()));
}
- if let Some(prefix) = &self.prefix {
- req.add_param("prefix", prefix);
+ if let Some(prefix) = self.prefix.as_deref() {
+ res.push(("prefix", prefix.into()));
}
if let Some(limit) = &self.limit {
- req.add_param("limit", &limit.to_string());
+ res.push(("limit", limit.to_string().into()));
}
if self.reverse {
- req.add_param("reverse", "true");
+ res.push(("reverse", "true".into()));
}
+ res
}
}
@@ -691,7 +746,7 @@ struct ErrorResponse {
}
struct Response {
- body: Vec<u8>,
+ body: Bytes,
status: StatusCode,
causality_token: Option<CausalityToken>,
content_type: Option<String>,
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 2b525a42..6dc954d4 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -14,11 +14,11 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", default-features = false, path = "../db" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_table = { version = "0.8.2", path = "../table" }
-garage_block = { version = "0.8.2", path = "../block" }
-garage_util = { version = "0.8.2", path = "../util" }
+garage_db.workspace = true
+garage_rpc.workspace = true
+garage_table.workspace = true
+garage_block.workspace = true
+garage_util.workspace = true
async-trait = "0.1.7"
arc-swap = "1.0"
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
index 8fe3a582..59d4ca5b 100644
--- a/src/model/k2v/seen.rs
+++ b/src/model/k2v/seen.rs
@@ -79,7 +79,7 @@ impl RangeSeenMarker {
let bytes = nonversioned_encode(&self)?;
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
- Ok(BASE64_STANDARD.encode(&bytes))
+ Ok(BASE64_STANDARD.encode(bytes))
}
/// Decode from msgpack+zstd+b64 representation, returns None on error.
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index dcf44f4a..25ed839d 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -14,11 +14,11 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.8.2", path = "../util" }
+garage_util.workspace = true
arc-swap = "1.0"
bytes = "1.0"
-gethostname = "0.2"
+gethostname = "0.4"
hex = "0.4"
tracing = "0.1"
rand = "0.8"
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index f85f789c..ab8d1112 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -8,16 +8,26 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
+use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig;
+const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
+
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "ServicePort")]
service_port: u16,
- #[serde(rename = "NodeMeta")]
- node_meta: HashMap<String, String>,
+ #[serde(rename = "ServiceMeta")]
+ meta: HashMap<String, String>,
+}
+
+#[derive(Serialize, Clone, Debug)]
+#[serde(untagged)]
+enum PublishRequest {
+ Catalog(ConsulPublishEntry),
+ Service(ConsulPublishService),
}
#[derive(Serialize, Clone, Debug)]
@@ -26,28 +36,43 @@ struct ConsulPublishEntry {
node: String,
#[serde(rename = "Address")]
address: IpAddr,
- #[serde(rename = "NodeMeta")]
- node_meta: HashMap<String, String>,
#[serde(rename = "Service")]
- service: ConsulPublishService,
+ service: ConsulPublishCatalogService,
}
#[derive(Serialize, Clone, Debug)]
-struct ConsulPublishService {
+struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
+ #[serde(rename = "Meta")]
+ meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
}
-// ----
+#[derive(Serialize, Clone, Debug)]
+struct ConsulPublishService {
+ #[serde(rename = "ID")]
+ service_id: String,
+ #[serde(rename = "Name")]
+ service_name: String,
+ #[serde(rename = "Tags")]
+ tags: Vec<String>,
+ #[serde(rename = "Address")]
+ address: IpAddr,
+ #[serde(rename = "Port")]
+ port: u16,
+ #[serde(rename = "Meta")]
+ meta: HashMap<String, String>,
+}
+// ----
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
@@ -55,44 +80,48 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
- let client = match (&config.client_cert, &config.client_key) {
- (Some(client_cert), Some(client_key)) => {
- let mut client_cert_buf = vec![];
- File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
-
- let mut client_key_buf = vec![];
- File::open(client_key)?.read_to_end(&mut client_key_buf)?;
-
- let identity = reqwest::Identity::from_pem(
- &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
- )?;
-
- if config.tls_skip_verify {
- reqwest::Client::builder()
- .use_rustls_tls()
- .danger_accept_invalid_certs(true)
- .identity(identity)
- .build()?
- } else if let Some(ca_cert) = &config.ca_cert {
- let mut ca_cert_buf = vec![];
- File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
-
- reqwest::Client::builder()
- .use_rustls_tls()
- .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
- .identity(identity)
- .build()?
- } else {
- reqwest::Client::builder()
- .use_rustls_tls()
- .identity(identity)
- .build()?
+ let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls();
+ if config.tls_skip_verify {
+ builder = builder.danger_accept_invalid_certs(true);
+ } else if let Some(ca_cert) = &config.ca_cert {
+ let mut ca_cert_buf = vec![];
+ File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
+ builder =
+ builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
+ }
+
+ match &config.api {
+ ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) {
+ (Some(client_cert), Some(client_key)) => {
+ let mut client_cert_buf = vec![];
+ File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
+
+ let mut client_key_buf = vec![];
+ File::open(client_key)?.read_to_end(&mut client_key_buf)?;
+
+ let identity = reqwest::Identity::from_pem(
+ &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
+ )?;
+
+ builder = builder.identity(identity);
+ }
+ (None, None) => {}
+ _ => return Err(ConsulError::InvalidTLSConfig),
+ },
+ ConsulDiscoveryAPI::Agent => {
+ if let Some(token) = &config.token {
+ let mut headers = reqwest::header::HeaderMap::new();
+ headers.insert(
+ "x-consul-token",
+ reqwest::header::HeaderValue::from_str(&token)?,
+ );
+ builder = builder.default_headers(headers);
}
}
- (None, None) => reqwest::Client::new(),
- _ => return Err(ConsulError::InvalidTLSConfig),
};
+ let client: reqwest::Client = builder.build()?;
+
Ok(Self { client, config })
}
@@ -111,8 +140,8 @@ impl ConsulDiscovery {
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
- .node_meta
- .get("pubkey")
+ .meta
+ .get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@@ -138,29 +167,49 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
+ let tags = [
+ vec!["advertised-by-garage".into(), hostname.into()],
+ self.config.tags.clone(),
+ ]
+ .concat();
- let advertisement = ConsulPublishEntry {
- node: node.clone(),
- address: rpc_public_addr.ip(),
- node_meta: [
- ("pubkey".to_string(), hex::encode(node_id)),
- ("hostname".to_string(), hostname.to_string()),
- ]
- .iter()
- .cloned()
- .collect(),
- service: ConsulPublishService {
+ let mut meta = self.config.meta.clone().unwrap_or_default();
+ meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id));
+ meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string());
+
+ let url = format!(
+ "{}/v1/{}",
+ self.config.consul_http_addr,
+ (match &self.config.api {
+ ConsulDiscoveryAPI::Catalog => "catalog/register",
+ ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks",
+ })
+ );
+
+ let req = self.client.put(&url);
+ let advertisement: PublishRequest = match &self.config.api {
+ ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry {
+ node: node.clone(),
+ address: rpc_public_addr.ip(),
+ service: ConsulPublishCatalogService {
+ service_id: node.clone(),
+ service_name: self.config.service_name.clone(),
+ tags,
+ meta: meta.clone(),
+ address: rpc_public_addr.ip(),
+ port: rpc_public_addr.port(),
+ },
+ }),
+ ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
- tags: vec!["advertised-by-garage".into(), hostname.into()],
+ tags,
+ meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
- },
+ }),
};
-
- let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
-
- let http = self.client.put(&url).json(&advertisement).send().await?;
+ let http = req.json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
@@ -176,4 +225,6 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig,
+ #[error(display = "Token error: {}", _0)]
+ Token(#[error(source)] reqwest::header::InvalidHeaderValue),
}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index c794c924..d0776945 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -14,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
-garage_rpc = { version = "0.8.2", path = "../rpc" }
-garage_util = { version = "0.8.2", path = "../util" }
+garage_db.workspace = true
+garage_rpc.workspace = true
+garage_util.workspace = true
opentelemetry = "0.17"
diff --git a/src/table/data.rs b/src/table/data.rs
index 26cc3a5a..e76836ca 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -44,22 +44,22 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
- .open_tree(&format!("{}:table", F::TABLE_NAME))
+ .open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
let merkle_tree = db
- .open_tree(&format!("{}:merkle_tree", F::TABLE_NAME))
+ .open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
- .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
+ .open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let insert_queue = db
- .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree");
let gc_todo = db
- .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
+ .open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
@@ -90,7 +90,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
let tree_key = self.tree_key(p, s);
- if let Some(bytes) = self.store.get(&tree_key)? {
+ if let Some(bytes) = self.store.get(tree_key)? {
Ok(Some(ByteBuf::from(bytes.to_vec())))
} else {
Ok(None)
@@ -132,10 +132,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
- fn read_range_aux<'a>(
+ fn read_range_aux(
&self,
partition_hash: Hash,
- range: db::ValueIter<'a>,
+ range: db::ValueIter,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
diff --git a/src/table/util.rs b/src/table/util.rs
index 0b10cf3f..663a7e11 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -34,8 +34,9 @@ impl DeletedFilter {
}
}
-#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub enum EnumerationOrder {
+ #[default]
Forward,
Reverse,
}
@@ -49,9 +50,3 @@ impl EnumerationOrder {
}
}
}
-
-impl Default for EnumerationOrder {
- fn default() -> Self {
- EnumerationOrder::Forward
- }
-}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 2e6231f6..f72051b9 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -14,7 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.2", path = "../db" }
+garage_db.workspace = true
arc-swap = "1.0"
async-trait = "0.1"
@@ -22,7 +22,6 @@ blake2 = "0.10"
bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
-git-version = "0.3.4"
hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
diff --git a/src/util/config.rs b/src/util/config.rs
index 2176353e..1da95b2f 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -135,8 +135,19 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
+#[derive(Deserialize, Debug, Clone, Default)]
+#[serde(rename_all = "lowercase")]
+pub enum ConsulDiscoveryAPI {
+ #[default]
+ Catalog,
+ Agent,
+}
+
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig {
+ /// The consul api to use when registering: either `catalog` (the default) or `agent`
+ #[serde(default)]
+ pub api: ConsulDiscoveryAPI,
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Consul service name to use
@@ -147,9 +158,17 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul
pub client_key: Option<String>,
+ /// /// Token to use for connecting to consul
+ pub token: Option<String>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
+ /// Additional tags to add to the service
+ #[serde(default)]
+ pub tags: Vec<String>,
+ /// Additional service metadata to add
+ #[serde(default)]
+ pub meta: Option<std::collections::HashMap<String, String>>,
}
#[derive(Deserialize, Debug, Clone)]
@@ -223,7 +242,7 @@ fn secret_from_file(
#[cfg(unix)]
if std::env::var("GARAGE_ALLOW_WORLD_READABLE_SECRETS").as_deref() != Ok("true") {
use std::os::unix::fs::MetadataExt;
- let metadata = std::fs::metadata(&file_path)?;
+ let metadata = std::fs::metadata(file_path)?;
if metadata.mode() & 0o077 != 0 {
return Err(format!("File {} is world-readable! (mode: 0{:o}, expected 0600)\nRefusing to start until this is fixed, or environment variable GARAGE_ALLOW_WORLD_READABLE_SECRETS is set to true.", file_path, metadata.mode()).into());
}
@@ -344,7 +363,7 @@ mod tests {
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
-
+
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
@@ -388,7 +407,7 @@ mod tests {
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
-
+
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
diff --git a/src/util/forwarded_headers.rs b/src/util/forwarded_headers.rs
index 6ae275aa..12f76434 100644
--- a/src/util/forwarded_headers.rs
+++ b/src/util/forwarded_headers.rs
@@ -13,7 +13,7 @@ pub fn handle_forwarded_for_headers(headers: &HeaderMap<HeaderValue>) -> Result<
.to_str()
.ok_or_message("Error parsing X-Forwarded-For header")?;
- let client_ip = IpAddr::from_str(&forwarded_for_ip_str)
+ let client_ip = IpAddr::from_str(forwarded_for_ip_str)
.ok_or_message("Valid IP address not found in X-Forwarded-For header")?;
Ok(client_ip.to_string())
diff --git a/src/util/lib.rs b/src/util/lib.rs
index c9110fb2..15f0f829 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -10,7 +10,6 @@ pub mod crdt;
pub mod data;
pub mod encode;
pub mod error;
-pub mod formater;
pub mod forwarded_headers;
pub mod metrics;
pub mod migrate;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
index 1229fd9c..b9cce08a 100644
--- a/src/util/migrate.rs
+++ b/src/util/migrate.rs
@@ -27,7 +27,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
Self::Previous::decode(bytes).map(Self::migrate)
}
- /// Encode this type with optionnal version marker
+ /// Encode this type with optional version marker
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut wr = Vec::with_capacity(128);
wr.extend_from_slice(Self::VERSION_MARKER);
diff --git a/src/util/version.rs b/src/util/version.rs
index 2b2ea271..19907ed1 100644
--- a/src/util/version.rs
+++ b/src/util/version.rs
@@ -1,18 +1,14 @@
use std::sync::Arc;
-use arc_swap::{ArcSwap, ArcSwapOption};
+use arc_swap::ArcSwapOption;
lazy_static::lazy_static! {
- static ref VERSION: ArcSwap<&'static str> = ArcSwap::new(Arc::new(git_version::git_version!(
- prefix = "git:",
- cargo_prefix = "cargo:",
- fallback = "unknown"
- )));
+ static ref VERSION: ArcSwapOption<&'static str> = ArcSwapOption::new(None);
static ref FEATURES: ArcSwapOption<&'static [&'static str]> = ArcSwapOption::new(None);
}
pub fn garage_version() -> &'static str {
- &VERSION.load()
+ VERSION.load().as_ref().unwrap()
}
pub fn garage_features() -> Option<&'static [&'static str]> {
@@ -20,7 +16,7 @@ pub fn garage_features() -> Option<&'static [&'static str]> {
}
pub fn init_version(version: &'static str) {
- VERSION.store(Arc::new(version));
+ VERSION.store(Some(Arc::new(version)));
}
pub fn init_features(features: &'static [&'static str]) {
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index d0a23af4..423d3829 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.8.2", path = "../api" }
-garage_model = { version = "0.8.2", path = "../model" }
-garage_util = { version = "0.8.2", path = "../util" }
-garage_table = { version = "0.8.2", path = "../table" }
+garage_api.workspace = true
+garage_model.workspace = true
+garage_util.workspace = true
+garage_table.workspace = true
err-derive = "0.3"
tracing = "0.1"
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 0c7edf23..287aef1a 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -1,4 +1,4 @@
-use std::{borrow::Cow, convert::Infallible, net::SocketAddr, sync::Arc};
+use std::{convert::Infallible, net::SocketAddr, sync::Arc};
use futures::future::Future;
@@ -6,7 +6,7 @@ use hyper::{
header::{HeaderValue, HOST},
server::conn::AddrStream,
service::{make_service_fn, service_fn},
- Body, Method, Request, Response, Server,
+ Body, Method, Request, Response, Server, StatusCode,
};
use opentelemetry::{
@@ -28,6 +28,7 @@ use garage_api::s3::get::{handle_get, handle_head};
use garage_model::garage::Garage;
use garage_table::*;
+use garage_util::data::Uuid;
use garage_util::error::Error as GarageError;
use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration};
@@ -106,7 +107,7 @@ impl WebServer {
addr: SocketAddr,
) -> Result<Response<Body>, Infallible> {
if let Ok(forwarded_for_ip_addr) =
- forwarded_headers::handle_forwarded_for_headers(&req.headers())
+ forwarded_headers::handle_forwarded_for_headers(req.headers())
{
info!(
"{} (via {}) {} {}",
@@ -168,6 +169,17 @@ impl WebServer {
}
}
+ async fn check_key_exists(self: &Arc<Self>, bucket_id: Uuid, key: &str) -> Result<bool, Error> {
+ let exists = self
+ .garage
+ .object_table
+ .get(&bucket_id, &key.to_string())
+ .await?
+ .map(|object| object.versions().iter().any(|v| v.is_data()))
+ .unwrap_or(false);
+ Ok(exists)
+ }
+
async fn serve_file(self: &Arc<Self>, req: &Request<Body>) -> Result<Response<Body>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
@@ -207,11 +219,11 @@ impl WebServer {
// Get path
let path = req.uri().path().to_string();
let index = &website_config.index_document;
- let key = path_to_key(&path, index)?;
+ let (key, may_redirect) = path_to_keys(&path, index)?;
debug!(
- "Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
- bucket_name, bucket_id, key
+ "Selected bucket: \"{}\" {:?}, target key: \"{}\", may redirect to: {:?}",
+ bucket_name, bucket_id, key, may_redirect
);
let ret_doc = match *req.method() {
@@ -219,10 +231,23 @@ impl WebServer {
Method::HEAD => handle_head(self.garage.clone(), req, bucket_id, &key, None).await,
Method::GET => handle_get(self.garage.clone(), req, bucket_id, &key, None).await,
_ => Err(ApiError::bad_request("HTTP method not supported")),
- }
- .map_err(Error::from);
+ };
+
+ // Try implicit redirect on error
+ let ret_doc_with_redir = match (&ret_doc, may_redirect) {
+ (Err(ApiError::NoSuchKey), ImplicitRedirect::To { key, url })
+ if self.check_key_exists(bucket_id, key.as_str()).await? =>
+ {
+ Ok(Response::builder()
+ .status(StatusCode::FOUND)
+ .header("Location", url)
+ .body(Body::empty())
+ .unwrap())
+ }
+ _ => ret_doc,
+ };
- match ret_doc {
+ match ret_doc_with_redir.map_err(Error::from) {
Err(error) => {
// For a HEAD or OPTIONS method, and for non-4xx errors,
// we don't return the error document as content,
@@ -308,30 +333,45 @@ fn error_to_res(e: Error) -> Response<Body> {
http_error
}
+#[derive(Debug, PartialEq)]
+enum ImplicitRedirect {
+ No,
+ To { key: String, url: String },
+}
+
/// Path to key
///
/// Convert the provided path to the internal key
/// When a path ends with "/", we append the index name to match traditional web server behavior
/// which is also AWS S3 behavior.
-fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
+///
+/// Check: https://docs.aws.amazon.com/AmazonS3/latest/userguide/IndexDocumentSupport.html
+fn path_to_keys<'a>(path: &'a str, index: &str) -> Result<(String, ImplicitRedirect), Error> {
let path_utf8 = percent_encoding::percent_decode_str(path).decode_utf8()?;
- if !path_utf8.starts_with('/') {
- return Err(Error::BadRequest("Path must start with a / (slash)".into()));
- }
-
- match path_utf8.chars().last() {
- None => unreachable!(),
- Some('/') => {
- let mut key = String::with_capacity(path_utf8.len() + index.len());
- key.push_str(&path_utf8[1..]);
- key.push_str(index);
- Ok(key.into())
- }
- Some(_) => match path_utf8 {
- Cow::Borrowed(pu8) => Ok((&pu8[1..]).into()),
- Cow::Owned(pu8) => Ok(pu8[1..].to_string().into()),
- },
+ let base_key = match path_utf8.strip_prefix("/") {
+ Some(bk) => bk,
+ None => return Err(Error::BadRequest("Path must start with a / (slash)".into())),
+ };
+ let is_bucket_root = base_key.len() == 0;
+ let is_trailing_slash = path_utf8.ends_with("/");
+
+ match (is_bucket_root, is_trailing_slash) {
+ // It is not possible to store something at the root of the bucket (ie. empty key),
+ // the only option is to fetch the index
+ (true, _) => Ok((index.to_string(), ImplicitRedirect::No)),
+
+ // "If you create a folder structure in your bucket, you must have an index document at each level. In each folder, the index document must have the same name, for example, index.html. When a user specifies a URL that resembles a folder lookup, the presence or absence of a trailing slash determines the behavior of the website. For example, the following URL, with a trailing slash, returns the photos/index.html index document."
+ (false, true) => Ok((format!("{base_key}{index}"), ImplicitRedirect::No)),
+
+ // "However, if you exclude the trailing slash from the preceding URL, Amazon S3 first looks for an object photos in the bucket. If the photos object is not found, it searches for an index document, photos/index.html. If that document is found, Amazon S3 returns a 302 Found message and points to the photos/ key. For subsequent requests to photos/, Amazon S3 returns photos/index.html. If the index document is not found, Amazon S3 returns an error."
+ (false, false) => Ok((
+ base_key.to_string(),
+ ImplicitRedirect::To {
+ key: format!("{base_key}/{index}"),
+ url: format!("{path}/"),
+ },
+ )),
}
}
@@ -340,13 +380,37 @@ mod tests {
use super::*;
#[test]
- fn path_to_key_test() -> Result<(), Error> {
- assert_eq!(path_to_key("/file%20.jpg", "index.html")?, "file .jpg");
- assert_eq!(path_to_key("/%20t/", "index.html")?, " t/index.html");
- assert_eq!(path_to_key("/", "index.html")?, "index.html");
- assert_eq!(path_to_key("/hello", "index.html")?, "hello");
- assert!(path_to_key("", "index.html").is_err());
- assert!(path_to_key("i/am/relative", "index.html").is_err());
+ fn path_to_keys_test() -> Result<(), Error> {
+ assert_eq!(
+ path_to_keys("/file%20.jpg", "index.html")?,
+ (
+ "file .jpg".to_string(),
+ ImplicitRedirect::To {
+ key: "file .jpg/index.html".to_string(),
+ url: "/file%20.jpg/".to_string()
+ }
+ )
+ );
+ assert_eq!(
+ path_to_keys("/%20t/", "index.html")?,
+ (" t/index.html".to_string(), ImplicitRedirect::No)
+ );
+ assert_eq!(
+ path_to_keys("/", "index.html")?,
+ ("index.html".to_string(), ImplicitRedirect::No)
+ );
+ assert_eq!(
+ path_to_keys("/hello", "index.html")?,
+ (
+ "hello".to_string(),
+ ImplicitRedirect::To {
+ key: "hello/index.html".to_string(),
+ url: "/hello/".to_string()
+ }
+ )
+ );
+ assert!(path_to_keys("", "index.html").is_err());
+ assert!(path_to_keys("i/am/relative", "index.html").is_err());
Ok(())
}
}