aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
committerAlex Auvolat <alex@adnab.me>2023-04-25 12:34:26 +0200
commitfa78d806e3ae40031e80eebb86e4eb1756d7baea (patch)
tree144662fb430c484093f6f9a585a2441c2ff26494 /src/garage
parent654999e254e6c1f46bb5d668bc1230f226575716 (diff)
parenta16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff)
downloadgarage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz
garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip
Merge branch 'main' into next
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml26
-rw-r--r--src/garage/admin.rs249
-rw-r--r--src/garage/cli/cmd.rs18
-rw-r--r--src/garage/cli/init.rs4
-rw-r--r--src/garage/cli/structs.rs36
-rw-r--r--src/garage/cli/util.rs10
-rw-r--r--src/garage/main.rs48
-rw-r--r--src/garage/repair/offline.rs9
-rw-r--r--src/garage/repair/online.rs6
-rw-r--r--src/garage/server.rs5
-rw-r--r--src/garage/tests/admin.rs27
-rw-r--r--src/garage/tests/bucket.rs4
-rw-r--r--src/garage/tests/common/client.rs12
-rw-r--r--src/garage/tests/common/custom_requester.rs9
-rw-r--r--src/garage/tests/common/garage.rs49
-rw-r--r--src/garage/tests/common/mod.rs13
-rw-r--r--src/garage/tests/k2v/batch.rs103
-rw-r--r--src/garage/tests/k2v/item.rs37
-rw-r--r--src/garage/tests/k2v/poll.rs173
-rw-r--r--src/garage/tests/s3/streaming_signature.rs2
-rw-r--r--src/garage/tests/s3/website.rs173
21 files changed, 799 insertions, 214 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index b43b0242..0cbdf890 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.8.1"
+version = "0.8.2"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -21,22 +21,22 @@ path = "tests/lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_db = { version = "0.8.1", path = "../db" }
-garage_api = { version = "0.8.1", path = "../api" }
-garage_block = { version = "0.8.1", path = "../block" }
-garage_model = { version = "0.8.1", path = "../model" }
-garage_rpc = { version = "0.8.1", path = "../rpc" }
-garage_table = { version = "0.8.1", path = "../table" }
-garage_util = { version = "0.8.1", path = "../util" }
-garage_web = { version = "0.8.1", path = "../web" }
+garage_db = { version = "0.8.2", path = "../db" }
+garage_api = { version = "0.8.2", path = "../api" }
+garage_block = { version = "0.8.2", path = "../block" }
+garage_model = { version = "0.8.2", path = "../model" }
+garage_rpc = { version = "0.8.2", path = "../rpc" }
+garage_table = { version = "0.8.2", path = "../table" }
+garage_util = { version = "0.8.2", path = "../util" }
+garage_web = { version = "0.8.2", path = "../web" }
backtrace = "0.3"
bytes = "1.0"
bytesize = "1.1"
-timeago = "0.3"
+timeago = "0.4"
parse_duration = "2.1"
hex = "0.4"
-tracing = { version = "0.1.30" }
+tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rand = "0.8"
async-trait = "0.1.7"
@@ -45,7 +45,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
-toml = "0.5"
+toml = "0.6"
futures = "0.3"
futures-util = "0.3"
@@ -69,7 +69,7 @@ sha2 = "0.10"
static_init = "1.0"
assert-json-diff = "2.0"
serde_json = "1.0"
-base64 = "0.13"
+base64 = "0.21"
[features]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 58d645ac..e4e50520 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,10 +15,10 @@ use garage_util::time::*;
use garage_table::replication::*;
use garage_table::*;
+use garage_rpc::ring::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
-use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
@@ -60,6 +60,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
+ WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@@ -783,6 +784,7 @@ impl AdminRpcHandler {
for node in ring.layout.node_ids().iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
+ opt.skip_global = true;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
@@ -799,6 +801,15 @@ impl AdminRpcHandler {
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
+
+ writeln!(&mut ret, "\n======================").unwrap();
+ write!(
+ &mut ret,
+ "Cluster statistics:\n\n{}",
+ self.gather_cluster_stats()
+ )
+ .unwrap();
+
Ok(AdminRpc::Ok(ret))
} else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
@@ -809,32 +820,17 @@ impl AdminRpcHandler {
let mut ret = String::new();
writeln!(
&mut ret,
- "\nGarage version: {} [features: {}]",
+ "\nGarage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
+ garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
- // Gather ring statistics
- let ring = self.garage.system.ring.borrow().clone();
- let mut ring_nodes = HashMap::new();
- for (_i, loc) in ring.partitions().iter() {
- for n in ring.get_nodes(loc, ring.replication_factor).iter() {
- if !ring_nodes.contains_key(n) {
- ring_nodes.insert(*n, 0usize);
- }
- *ring_nodes.get_mut(n).unwrap() += 1;
- }
- }
- writeln!(&mut ret, "\nRing nodes & partition count:").unwrap();
- for (n, c) in ring_nodes.iter() {
- writeln!(&mut ret, " {:?} {}", n, c).unwrap();
- }
-
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
@@ -881,12 +877,110 @@ impl AdminRpcHandler {
.unwrap();
if !opt.detailed {
- writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
+ writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap();
+ }
+
+ if !opt.skip_global {
+ write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
}
Ok(ret)
}
+ fn gather_cluster_stats(&self) -> String {
+ let mut ret = String::new();
+
+ // Gather storage node and free space statistics
+ let layout = &self.garage.system.ring.borrow().layout;
+ let mut node_partition_count = HashMap::<Uuid, u64>::new();
+ for short_id in layout.ring_assignment_data.iter() {
+ let id = layout.node_id_vec[*short_id as usize];
+ *node_partition_count.entry(id).or_default() += 1;
+ }
+ let node_info = self
+ .garage
+ .system
+ .get_known_nodes()
+ .into_iter()
+ .map(|n| (n.id, n))
+ .collect::<HashMap<_, _>>();
+
+ let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
+ for (id, parts) in node_partition_count.iter() {
+ let info = node_info.get(id);
+ let status = info.map(|x| &x.status);
+ let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
+ let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
+ let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
+ let avail_str = |x| match x {
+ Some((avail, total)) => {
+ let pct = (avail as f64) / (total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(avail);
+ let total = bytesize::ByteSize::b(total);
+ format!("{}/{} ({:.1}%)", avail, total, pct)
+ }
+ None => "?".into(),
+ };
+ let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
+ let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
+ table.push(format!(
+ " {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
+ id, hostname, zone, capacity, parts, data_avail, meta_avail
+ ));
+ }
+ write!(
+ &mut ret,
+ "Storage nodes:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+
+ let meta_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.meta_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ let data_part_avail = node_partition_count
+ .iter()
+ .filter_map(|(id, parts)| {
+ node_info
+ .get(id)
+ .and_then(|x| x.status.data_disk_avail)
+ .map(|c| c.0 / *parts)
+ })
+ .collect::<Vec<_>>();
+ if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
+ let meta_avail =
+ bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ let data_avail =
+ bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
+ writeln!(
+ &mut ret,
+ "\nEstimated available storage space cluster-wide (might be lower in practice):"
+ )
+ .unwrap();
+ if meta_part_avail.len() < node_partition_count.len()
+ || data_part_avail.len() < node_partition_count.len()
+ {
+ writeln!(&mut ret, " data: < {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
+ writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
+ } else {
+ writeln!(&mut ret, " data: {}", data_avail).unwrap();
+ writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
+ }
+ }
+
+ ret
+ }
+
fn gather_table_stats<F, R>(
&self,
t: &Arc<Table<F, R>>,
@@ -943,32 +1037,101 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerOperation::Set { opt } => match opt {
- WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
- self.garage
- .block_manager
- .send_scrub_command(scrub_command)
- .await?;
- Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
- }
- WorkerSetCmd::ResyncWorkerCount { worker_count } => {
- self.garage
- .block_manager
- .resync
- .set_n_workers(*worker_count)
- .await?;
- Ok(AdminRpc::Ok("Number of resync workers updated".into()))
+ WorkerOperation::Get {
+ all_nodes,
+ variable,
+ } => self.handle_get_var(*all_nodes, variable).await,
+ WorkerOperation::Set {
+ all_nodes,
+ variable,
+ value,
+ } => self.handle_set_var(*all_nodes, variable, value).await,
+ }
+ }
+
+ async fn handle_get_var(
+ &self,
+ all_nodes: bool,
+ variable: &Option<String>,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Get {
+ all_nodes: false,
+ variable: variable.clone(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- WorkerSetCmd::ResyncTranquility { tranquility } => {
- self.garage
- .block_manager
- .resync
- .set_tranquility(*tranquility)
- .await?;
- Ok(AdminRpc::Ok("Resync tranquility updated".into()))
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ #[allow(clippy::collapsible_else_if)]
+ if let Some(v) = variable {
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ v.clone(),
+ self.garage.bg_vars.get(v)?,
+ )]))
+ } else {
+ let mut vars = self.garage.bg_vars.get_all();
+ vars.sort();
+ Ok(AdminRpc::WorkerVars(
+ vars.into_iter()
+ .map(|(k, v)| (self.garage.system.id, k.to_string(), v))
+ .collect(),
+ ))
+ }
+ }
+ }
+
+ async fn handle_set_var(
+ &self,
+ all_nodes: bool,
+ variable: &str,
+ value: &str,
+ ) -> Result<AdminRpc, Error> {
+ if all_nodes {
+ let mut ret = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.layout.node_ids().iter() {
+ let node = (*node).into();
+ match self
+ .endpoint
+ .call(
+ &node,
+ AdminRpc::Worker(WorkerOperation::Set {
+ all_nodes: false,
+ variable: variable.to_string(),
+ value: value.to_string(),
+ }),
+ PRIO_NORMAL,
+ )
+ .await??
+ {
+ AdminRpc::WorkerVars(v) => ret.extend(v),
+ m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
- },
+ }
+ Ok(AdminRpc::WorkerVars(ret))
+ } else {
+ self.garage.bg_vars.set(variable, value)?;
+ Ok(AdminRpc::WorkerVars(vec![(
+ self.garage.system.id,
+ variable.to_string(),
+ value.to_string(),
+ )]))
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 0d180ecd..905b14d3 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
- let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()];
+ let mut healthy_nodes =
+ vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
match layout.roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => {
+ let data_avail = match &adv.status.data_disk_avail {
+ _ if cfg.capacity.is_none() => "N/A".into(),
+ Some((avail, total)) => {
+ let pct = (*avail as f64) / (*total as f64) * 100.;
+ let avail = bytesize::ByteSize::b(*avail);
+ format!("{} ({:.1}%)", avail, pct)
+ }
+ None => "?".into(),
+ };
healthy_nodes.push(format!(
- "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}",
+ "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id,
host = adv.status.hostname,
addr = adv.addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
+ data_avail = data_avail,
));
}
_ => {
@@ -191,6 +202,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
+ AdminRpc::WorkerVars(wv) => {
+ print_worker_vars(wv);
+ }
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs
index 511b53a6..20813f1c 100644
--- a/src/garage/cli/init.rs
+++ b/src/garage/cli/init.rs
@@ -14,11 +14,11 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> {
garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?;
let idstr = if let Some(addr) = config.rpc_public_addr {
- let idstr = format!("{}@{}", hex::encode(&node_id), addr);
+ let idstr = format!("{}@{}", hex::encode(node_id), addr);
println!("{}", idstr);
idstr
} else {
- let idstr = hex::encode(&node_id);
+ let idstr = hex::encode(node_id);
println!("{}", idstr);
if !quiet {
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index dcb9fef9..986592ae 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -515,6 +515,11 @@ pub struct StatsOpt {
/// Gather detailed statistics (this can be long)
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
+
+ /// Don't show global cluster stats (internal use in RPC)
+ #[structopt(skip)]
+ #[serde(default)]
+ pub skip_global: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
@@ -528,11 +533,25 @@ pub enum WorkerOperation {
/// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())]
Info { tid: usize },
+ /// Get worker parameter
+ #[structopt(name = "get", version = garage_version())]
+ Get {
+ /// Gather variable values from all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable name to get, or none to get all variables
+ variable: Option<String>,
+ },
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
- #[structopt(subcommand)]
- opt: WorkerSetCmd,
+ /// Set variable values on all nodes
+ #[structopt(short = "a", long = "all-nodes")]
+ all_nodes: bool,
+ /// Variable node to set
+ variable: String,
+ /// Value to set the variable to
+ value: String,
},
}
@@ -547,19 +566,6 @@ pub struct WorkerListOpt {
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerSetCmd {
- /// Set tranquility of scrub operations
- #[structopt(name = "scrub-tranquility", version = garage_version())]
- ScrubTranquility { tranquility: u32 },
- /// Set number of concurrent block resync workers
- #[structopt(name = "resync-worker-count", version = garage_version())]
- ResyncWorkerCount { worker_count: usize },
- /// Set tranquility of block resync operations
- #[structopt(name = "resync-tranquility", version = garage_version())]
- ResyncTranquility { tranquility: u32 },
-}
-
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 63fd9eba..2c6be2f4 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -229,7 +229,7 @@ pub fn find_matching_node(
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
- if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) {
+ if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
@@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
+pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
+ let table = wv
+ .into_iter()
+ .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
+ .collect::<Vec<_>>();
+ format_table(table);
+}
+
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
diff --git a/src/garage/main.rs b/src/garage/main.rs
index cd1d6228..9b069d82 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -28,6 +28,7 @@ use structopt::StructOpt;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::NetworkKey;
+use garage_util::config::Config;
use garage_util::error::*;
use garage_rpc::system::*;
@@ -49,11 +50,10 @@ struct Opt {
#[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")]
pub rpc_host: Option<String>,
- /// RPC secret network key for admin operations
- #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
- pub rpc_secret: Option<String>,
+ #[structopt(flatten)]
+ pub secrets: Secrets,
- /// Configuration file (garage.toml)
+ /// Path to configuration file
#[structopt(
short = "c",
long = "config",
@@ -66,6 +66,24 @@ struct Opt {
cmd: Command,
}
+#[derive(StructOpt, Debug)]
+pub struct Secrets {
+ /// RPC secret network key, used to replace rpc_secret in config.toml when running the
+ /// daemon or doing admin operations
+ #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
+ pub rpc_secret: Option<String>,
+
+ /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")]
+ pub admin_token: Option<String>,
+
+ /// Metrics API authentication token, replaces admin.metrics_token in config.toml when
+ /// running the Garage daemon
+ #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")]
+ pub metrics_token: Option<String>,
+}
+
#[tokio::main]
async fn main() {
// Initialize version and features info
@@ -148,9 +166,9 @@ async fn main() {
sodiumoxide::init().expect("Unable to init sodiumoxide");
let res = match opt.cmd {
- Command::Server => server::run_server(opt.config_file).await,
+ Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
- repair::offline::offline_repair(opt.config_file, repair_opt).await
+ repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
@@ -165,7 +183,7 @@ async fn main() {
}
async fn cli_command(opt: Opt) -> Result<(), Error> {
- let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() {
+ let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() {
Some(garage_util::config::read_config(opt.config_file.clone())
.err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?)
} else {
@@ -174,9 +192,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
// Find and parse network RPC secret
let net_key_hex_str = opt
+ .secrets
.rpc_secret
.as_ref()
- .or_else(|| config.as_ref().map(|c| &c.rpc_secret))
+ .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref()))
.ok_or("No RPC secret provided")?;
let network_key = NetworkKey::from_slice(
&hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..],
@@ -233,3 +252,16 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
Ok(x) => Ok(x),
}
}
+
+fn fill_secrets(mut config: Config, secrets: Secrets) -> Config {
+ if secrets.rpc_secret.is_some() {
+ config.rpc_secret = secrets.rpc_secret;
+ }
+ if secrets.admin_token.is_some() {
+ config.admin.admin_token = secrets.admin_token;
+ }
+ if secrets.metrics_token.is_some() {
+ config.admin.metrics_token = secrets.metrics_token;
+ }
+ config
+}
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index 25193e4a..f4edcf03 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -6,8 +6,13 @@ use garage_util::error::*;
use garage_model::garage::Garage;
use crate::cli::structs::*;
+use crate::{fill_secrets, Secrets};
-pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> {
+pub async fn offline_repair(
+ config_file: PathBuf,
+ secrets: Secrets,
+ opt: OfflineRepairOpt,
+) -> Result<(), Error> {
if !opt.yes {
return Err(Error::Message(
"Please add the --yes flag to launch repair operation".into(),
@@ -15,7 +20,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
}
info!("Loading configuration...");
- let config = read_config(config_file)?;
+ let config = fill_secrets(read_config(config_file)?, secrets);
info!("Initializing Garage main data store...");
let garage = Garage::new(config)?;
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 627e3bf3..0e14ed51 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
- ScrubWorkerCommand::SetTranquility(tranquility)
+ garage
+ .block_manager
+ .scrub_persister
+ .set_with(|x| x.tranquility = tranquility)?;
+ return Ok(());
}
};
info!("Sending command to scrub worker: {:?}", cmd);
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 16f1b625..958089c6 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -17,6 +17,7 @@ use garage_api::k2v::api_server::K2VApiServer;
use crate::admin::*;
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
+use crate::{fill_secrets, Secrets};
async fn wait_from(mut chan: watch::Receiver<bool>) {
while !*chan.borrow() {
@@ -26,9 +27,9 @@ async fn wait_from(mut chan: watch::Receiver<bool>) {
}
}
-pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
+pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> {
info!("Loading configuration...");
- let config = read_config(config_file)?;
+ let config = fill_secrets(read_config(config_file)?, secrets);
// ---- Initialize Garage internals ----
diff --git a/src/garage/tests/admin.rs b/src/garage/tests/admin.rs
index 37aefe38..d3becf0a 100644
--- a/src/garage/tests/admin.rs
+++ b/src/garage/tests/admin.rs
@@ -21,14 +21,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "allow",
- "--read",
- "--key",
- &ctx.garage.key.id,
- BCKT_NAME,
- ])
+ .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
@@ -36,14 +29,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "deny",
- "--read",
- "--key",
- &ctx.garage.key.name,
- BCKT_NAME,
- ])
+ .args(["bucket", "deny", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
@@ -51,14 +37,7 @@ async fn test_admin_bucket_perms() {
ctx.garage
.command()
- .args([
- "bucket",
- "allow",
- "--read",
- "--key",
- &ctx.garage.key.name,
- BCKT_NAME,
- ])
+ .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME])
.quiet()
.expect_success_status("Could not create bucket");
diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs
index 9c363013..0dec3cfa 100644
--- a/src/garage/tests/bucket.rs
+++ b/src/garage/tests/bucket.rs
@@ -13,7 +13,7 @@ async fn test_bucket_all() {
ctx.garage
.command()
.args(["key", "deny"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not deny key to create buckets");
@@ -26,7 +26,7 @@ async fn test_bucket_all() {
ctx.garage
.command()
.args(["key", "allow"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not deny key to create buckets");
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index 212588b5..e9d4849a 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -1,15 +1,9 @@
use aws_sdk_s3::{Client, Config, Credentials, Endpoint};
-use super::garage::Instance;
+use super::garage::{Instance, Key};
-pub fn build_client(instance: &Instance) -> Client {
- let credentials = Credentials::new(
- &instance.key.id,
- &instance.key.secret,
- None,
- None,
- "garage-integ-test",
- );
+pub fn build_client(instance: &Instance, key: &Key) -> Client {
+ let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test");
let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 1700cc90..609eda97 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -14,6 +14,7 @@ use garage_api::signature;
/// You should ever only use this to send requests AWS sdk won't send,
/// like to reproduce behavior of unusual implementations found to be
/// problematic.
+#[derive(Clone)]
pub struct CustomRequester {
key: Key,
uri: Uri,
@@ -22,18 +23,18 @@ pub struct CustomRequester {
}
impl CustomRequester {
- pub fn new_s3(instance: &Instance) -> Self {
+ pub fn new_s3(instance: &Instance, key: &Key) -> Self {
CustomRequester {
- key: instance.key.clone(),
+ key: key.clone(),
uri: instance.s3_uri(),
service: "s3",
client: Client::new(),
}
}
- pub fn new_k2v(instance: &Instance) -> Self {
+ pub fn new_k2v(instance: &Instance, key: &Key) -> Self {
CustomRequester {
- key: instance.key.clone(),
+ key: key.clone(),
uri: instance.k2v_uri(),
service: "k2v",
client: Client::new(),
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index dbebe5b1..3beed7c4 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -13,7 +13,7 @@ static GARAGE_TEST_SECRET: &str =
#[derive(Debug, Default, Clone)]
pub struct Key {
- pub name: String,
+ pub name: Option<String>,
pub id: String,
pub secret: String,
}
@@ -21,10 +21,11 @@ pub struct Key {
pub struct Instance {
process: process::Child,
pub path: PathBuf,
- pub key: Key,
+ pub default_key: Key,
pub s3_port: u16,
pub k2v_port: u16,
pub web_port: u16,
+ pub admin_port: u16,
}
impl Instance {
@@ -101,22 +102,36 @@ api_bind_addr = "127.0.0.1:{admin_port}"
Instance {
process: child,
path,
- key: Key::default(),
+ default_key: Key::default(),
s3_port: port,
k2v_port: port + 1,
web_port: port + 3,
+ admin_port: port + 4,
}
}
fn setup(&mut self) {
- use std::{thread, time::Duration};
-
- // Wait for node to be ready
- thread::sleep(Duration::from_secs(2));
-
+ self.wait_for_boot();
self.setup_layout();
+ self.default_key = self.key(Some("garage_test"));
+ }
+
+ fn wait_for_boot(&mut self) {
+ use std::{thread, time::Duration};
- self.key = self.new_key("garage_test");
+ // 60 * 2 seconds = 120 seconds = 2min
+ for _ in 0..60 {
+ let termination = self
+ .command()
+ .args(["status"])
+ .quiet()
+ .status()
+ .expect("Unable to run command");
+ if termination.success() {
+ break;
+ }
+ thread::sleep(Duration::from_secs(2));
+ }
}
fn setup_layout(&self) {
@@ -167,13 +182,17 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.expect("Could not build garage endpoint URI")
}
- pub fn new_key(&self, name: &str) -> Key {
+ pub fn key(&self, maybe_name: Option<&str>) -> Key {
let mut key = Key::default();
- let output = self
- .command()
- .args(["key", "create", name])
- .expect_success_output("Could not create key");
+ let mut cmd = self.command();
+ let base = cmd.args(["key", "create"]);
+ let with_name = match maybe_name {
+ Some(name) => base.args([name]),
+ None => base,
+ };
+
+ let output = with_name.expect_success_output("Could not create key");
let stdout = String::from_utf8(output.stdout).unwrap();
for line in stdout.lines() {
@@ -190,7 +209,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty");
Key {
- name: name.to_owned(),
+ name: maybe_name.map(String::from),
..key
}
}
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index 28874b02..eca3e42b 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -13,13 +13,16 @@ use custom_requester::CustomRequester;
const REGION: Region = Region::from_static("garage-integ-test");
+#[derive(Clone)]
pub struct Context {
pub garage: &'static garage::Instance,
+ pub key: garage::Key,
pub client: Client,
pub custom_request: CustomRequester,
pub k2v: K2VContext,
}
+#[derive(Clone)]
pub struct K2VContext {
pub request: CustomRequester,
}
@@ -27,13 +30,15 @@ pub struct K2VContext {
impl Context {
fn new() -> Self {
let garage = garage::instance();
- let client = client::build_client(garage);
- let custom_request = CustomRequester::new_s3(garage);
- let k2v_request = CustomRequester::new_k2v(garage);
+ let key = garage.key(None);
+ let client = client::build_client(garage, &key);
+ let custom_request = CustomRequester::new_s3(garage, &key);
+ let k2v_request = CustomRequester::new_k2v(garage, &key);
Context {
garage,
client,
+ key,
custom_request,
k2v: K2VContext {
request: k2v_request,
@@ -57,7 +62,7 @@ impl Context {
.args(["bucket", "allow"])
.args(["--owner", "--read", "--write"])
.arg(&bucket_name)
- .args(["--key", &self.garage.key.name])
+ .args(["--key", &self.key.id])
.quiet()
.expect_success_status("Could not allow key for bucket");
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
index 6abba1c5..595d0ba8 100644
--- a/src/garage/tests/k2v/batch.rs
+++ b/src/garage/tests/k2v/batch.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -36,12 +37,12 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
{{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
]"#,
- base64::encode(values.get(&"a").unwrap()),
- base64::encode(values.get(&"b").unwrap()),
- base64::encode(values.get(&"c").unwrap()),
- base64::encode(values.get(&"d.1").unwrap()),
- base64::encode(values.get(&"d.2").unwrap()),
- base64::encode(values.get(&"e").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"a").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"b").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"e").unwrap()),
)
.into_bytes(),
)
@@ -120,12 +121,12 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -141,10 +142,10 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -160,9 +161,9 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -178,8 +179,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -195,8 +196,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -212,7 +213,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}
],
"more": true,
"nextStart": "b",
@@ -228,8 +229,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -255,10 +256,10 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
]"#,
ct.get(&"b").unwrap(),
- base64::encode(values.get(&"c'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c'").unwrap()),
ct.get(&"d.1").unwrap(),
- base64::encode(values.get(&"d.1'").unwrap()),
- base64::encode(values.get(&"d.2'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()),
)
.into_bytes(),
)
@@ -333,11 +334,11 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -353,8 +354,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -370,7 +371,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -386,7 +387,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": true,
"nextStart": "d.2",
@@ -402,7 +403,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -418,8 +419,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -435,8 +436,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -452,8 +453,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -563,8 +564,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -580,8 +581,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -599,10 +600,10 @@ async fn test_batch() {
"items": [
{"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
],
"more": false,
"nextStart": null,
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 2641386f..588836c7 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -222,7 +223,10 @@ async fn test_items_and_indices() {
let res_json = json_body(res).await;
assert_json_eq!(
res_json,
- [base64::encode(&content2), base64::encode(&content3)]
+ [
+ BASE64_STANDARD.encode(&content2),
+ BASE64_STANDARD.encode(&content3)
+ ]
);
// ReadIndex -- now there should be some stuff
@@ -411,7 +415,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// f2: binary
let res = ctx
@@ -452,7 +456,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// -- Test with a second, concurrent value --
let res = ctx
@@ -488,8 +492,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -512,8 +516,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -550,8 +554,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -587,7 +591,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f1: not specified
let res = ctx
@@ -612,7 +619,10 @@ async fn test_item_return_format() {
.unwrap()
.to_string();
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f2: binary
let res = ctx
@@ -644,7 +654,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// -- Delete everything --
let res = ctx
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index e56705ae..dd44aed9 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,12 +1,17 @@
+use base64::prelude::*;
use hyper::{Method, StatusCode};
use std::time::Duration;
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
use crate::common;
#[tokio::test]
-async fn test_poll() {
+async fn test_poll_item() {
let ctx = common::context();
- let bucket = ctx.create_bucket("test-k2v-poll");
+ let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@@ -52,8 +57,8 @@ async fn test_poll() {
let poll = {
let bucket = bucket.clone();
let ct = ct.clone();
+ let ctx = ctx.clone();
tokio::spawn(async move {
- let ctx = common::context();
ctx.k2v
.request
.builder(bucket.clone())
@@ -96,3 +101,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
+
+#[tokio::test]
+async fn test_poll_range() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll-range");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Initial poll range, retrieve single item and first seen_marker
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(b"{}".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let json_res = json_body(res2).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_json_eq!(
+ json_res,
+ json!(
+ {
+ "items": [
+ {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]},
+ ],
+ "seenMarker": seen_marker,
+ }
+ )
+ );
+
+ // Second poll range, which will complete later
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"New value")])
+ );
+
+ // Now we will add a value on a different key
+ // Start a new poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write value on different key
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test2"))
+ .body(b"Other value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"Other value")])
+ );
+}
diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index 48da7607..b7a1acae 100644
--- a/src/garage/tests/s3/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
@@ -109,7 +109,7 @@ async fn test_create_bucket_streaming() {
ctx.garage
.command()
.args(["key", "allow"])
- .args(["--create-bucket", &ctx.garage.key.id])
+ .args(["--create-bucket", &ctx.key.id])
.quiet()
.expect_success_output("Could not allow key to create buckets");
diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs
index 244a2fa0..f61838e4 100644
--- a/src/garage/tests/s3/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -1,5 +1,8 @@
use crate::common;
use crate::common::ext::*;
+use crate::k2v::json_body;
+
+use assert_json_diff::assert_json_eq;
use aws_sdk_s3::{
model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
types::ByteStream,
@@ -9,6 +12,7 @@ use hyper::{
body::{to_bytes, Body},
Client,
};
+use serde_json::json;
const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
const BODY_ERR: &[u8; 6] = b"erreur";
@@ -49,6 +53,31 @@ async fn test_website() {
BODY.as_ref()
); /* check that we do not leak body */
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
ctx.garage
.command()
.args(["bucket", "website", "--allow", BCKT_NAME])
@@ -62,6 +91,25 @@ async fn test_website() {
BODY.as_ref()
);
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let mut admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::OK);
+ assert_eq!(
+ to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(),
+ format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes()
+ );
+
ctx.garage
.command()
.args(["bucket", "website", "--deny", BCKT_NAME])
@@ -74,6 +122,31 @@ async fn test_website() {
to_bytes(resp.body_mut()).await.unwrap().as_ref(),
BODY.as_ref()
); /* check that we do not leak body */
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{0}/check?domain={1}",
+ ctx.garage.admin_port,
+ BCKT_NAME.to_string()
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: Bucket 'my-website' is not authorized for website hosting",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
}
#[tokio::test]
@@ -322,3 +395,103 @@ async fn test_website_s3_api() {
);
}
}
+
+#[tokio::test]
+async fn test_website_check_website_enabled() {
+ let ctx = common::context();
+
+ let client = Client::new();
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "InvalidRequest",
+ "message": "Bad request: No domain query string found",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: ",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=foobar",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: foobar",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+
+ let admin_req = || {
+ Request::builder()
+ .method("GET")
+ .uri(format!(
+ "http://127.0.0.1:{}/check?domain=%E2%98%B9",
+ ctx.garage.admin_port
+ ))
+ .body(Body::empty())
+ .unwrap()
+ };
+
+ let admin_resp = client.request(admin_req()).await.unwrap();
+ assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND);
+ let res_body = json_body(admin_resp).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "code": "NoSuchBucket",
+ "message": "Bucket not found: ☹",
+ "region": "garage-integ-test",
+ "path": "/check",
+ })
+ );
+}