diff options
author | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-04-25 12:34:26 +0200 |
commit | fa78d806e3ae40031e80eebb86e4eb1756d7baea (patch) | |
tree | 144662fb430c484093f6f9a585a2441c2ff26494 /src/garage | |
parent | 654999e254e6c1f46bb5d668bc1230f226575716 (diff) | |
parent | a16eb7e4b8344d2f58c09a249b7b1bd17d339a35 (diff) | |
download | garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.tar.gz garage-fa78d806e3ae40031e80eebb86e4eb1756d7baea.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 26 | ||||
-rw-r--r-- | src/garage/admin.rs | 249 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 18 | ||||
-rw-r--r-- | src/garage/cli/init.rs | 4 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 36 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 10 | ||||
-rw-r--r-- | src/garage/main.rs | 48 | ||||
-rw-r--r-- | src/garage/repair/offline.rs | 9 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 6 | ||||
-rw-r--r-- | src/garage/server.rs | 5 | ||||
-rw-r--r-- | src/garage/tests/admin.rs | 27 | ||||
-rw-r--r-- | src/garage/tests/bucket.rs | 4 | ||||
-rw-r--r-- | src/garage/tests/common/client.rs | 12 | ||||
-rw-r--r-- | src/garage/tests/common/custom_requester.rs | 9 | ||||
-rw-r--r-- | src/garage/tests/common/garage.rs | 49 | ||||
-rw-r--r-- | src/garage/tests/common/mod.rs | 13 | ||||
-rw-r--r-- | src/garage/tests/k2v/batch.rs | 103 | ||||
-rw-r--r-- | src/garage/tests/k2v/item.rs | 37 | ||||
-rw-r--r-- | src/garage/tests/k2v/poll.rs | 173 | ||||
-rw-r--r-- | src/garage/tests/s3/streaming_signature.rs | 2 | ||||
-rw-r--r-- | src/garage/tests/s3/website.rs | 173 |
21 files changed, 799 insertions, 214 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index b43b0242..0cbdf890 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.8.1" +version = "0.8.2" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -21,22 +21,22 @@ path = "tests/lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.1", path = "../db" } -garage_api = { version = "0.8.1", path = "../api" } -garage_block = { version = "0.8.1", path = "../block" } -garage_model = { version = "0.8.1", path = "../model" } -garage_rpc = { version = "0.8.1", path = "../rpc" } -garage_table = { version = "0.8.1", path = "../table" } -garage_util = { version = "0.8.1", path = "../util" } -garage_web = { version = "0.8.1", path = "../web" } +garage_db = { version = "0.8.2", path = "../db" } +garage_api = { version = "0.8.2", path = "../api" } +garage_block = { version = "0.8.2", path = "../block" } +garage_model = { version = "0.8.2", path = "../model" } +garage_rpc = { version = "0.8.2", path = "../rpc" } +garage_table = { version = "0.8.2", path = "../table" } +garage_util = { version = "0.8.2", path = "../util" } +garage_web = { version = "0.8.2", path = "../web" } backtrace = "0.3" bytes = "1.0" bytesize = "1.1" -timeago = "0.3" +timeago = "0.4" parse_duration = "2.1" hex = "0.4" -tracing = { version = "0.1.30" } +tracing = { version = "0.1" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } rand = "0.8" async-trait = "0.1.7" @@ -45,7 +45,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" structopt = { version = "0.3", default-features = false } -toml = "0.5" +toml = "0.6" futures = "0.3" futures-util = "0.3" @@ -69,7 +69,7 @@ sha2 = "0.10" static_init = "1.0" assert-json-diff = "2.0" serde_json = "1.0" -base64 = "0.13" +base64 = "0.21" [features] diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 58d645ac..e4e50520 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -15,10 +15,10 @@ use garage_util::time::*; use garage_table::replication::*; use garage_table::*; +use garage_rpc::ring::PARTITION_BITS; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; @@ -60,6 +60,7 @@ pub enum AdminRpc { HashMap<usize, garage_util::background::WorkerInfo>, WorkerListOpt, ), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec<BlockResyncErrorInfo>), BlockInfo { @@ -783,6 +784,7 @@ impl AdminRpcHandler { for node in ring.layout.node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; + opt.skip_global = true; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); @@ -799,6 +801,15 @@ impl AdminRpcHandler { Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), } } + + writeln!(&mut ret, "\n======================").unwrap(); + write!( + &mut ret, + "Cluster statistics:\n\n{}", + self.gather_cluster_stats() + ) + .unwrap(); + Ok(AdminRpc::Ok(ret)) } else { Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) @@ -809,32 +820,17 @@ impl AdminRpcHandler { let mut ret = String::new(); writeln!( &mut ret, - "\nGarage version: {} [features: {}]", + "\nGarage version: {} [features: {}]\nRust compiler version: {}", garage_util::version::garage_version(), garage_util::version::garage_features() .map(|list| list.join(", ")) .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), ) .unwrap(); writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - // Gather ring statistics - let ring = self.garage.system.ring.borrow().clone(); - let mut ring_nodes = HashMap::new(); - for (_i, loc) in ring.partitions().iter() { - for n in ring.get_nodes(loc, ring.replication_factor).iter() { - if !ring_nodes.contains_key(n) { - ring_nodes.insert(*n, 0usize); - } - *ring_nodes.get_mut(n).unwrap() += 1; - } - } - writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); - for (n, c) in ring_nodes.iter() { - writeln!(&mut ret, " {:?} {}", n, c).unwrap(); - } - // Gather table statistics let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?); @@ -881,12 +877,110 @@ impl AdminRpcHandler { .unwrap(); if !opt.detailed { - writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap(); + writeln!(&mut ret, "\nIf values are missing above (marked as NC), consider adding the --detailed flag (this will be slow).").unwrap(); + } + + if !opt.skip_global { + write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); } Ok(ret) } + fn gather_cluster_stats(&self) -> String { + let mut ret = String::new(); + + // Gather storage node and free space statistics + let layout = &self.garage.system.ring.borrow().layout; + let mut node_partition_count = HashMap::<Uuid, u64>::new(); + for short_id in layout.ring_assignment_data.iter() { + let id = layout.node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = self + .garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::<HashMap<_, _>>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::<Vec<_>>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + ret + } + fn gather_table_stats<F, R>( &self, t: &Arc<Table<F, R>>, @@ -943,32 +1037,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Set { opt } => match opt { - WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); - self.garage - .block_manager - .send_scrub_command(scrub_command) - .await?; - Ok(AdminRpc::Ok("Scrub tranquility updated".into())) - } - WorkerSetCmd::ResyncWorkerCount { worker_count } => { - self.garage - .block_manager - .resync - .set_n_workers(*worker_count) - .await?; - Ok(AdminRpc::Ok("Number of resync workers updated".into())) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option<String>, + ) -> Result<AdminRpc, Error> { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - WorkerSetCmd::ResyncTranquility { tranquility } => { - self.garage - .block_manager - .resync - .set_tranquility(*tranquility) - .await?; - Ok(AdminRpc::Ok("Resync tranquility updated".into())) + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result<AdminRpc, Error> { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - }, + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 0d180ecd..905b14d3 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -59,18 +59,29 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()]; + let mut healthy_nodes = + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { match layout.roles.get(&adv.id) { Some(NodeRoleV(Some(cfg))) => { + let data_avail = match &adv.status.data_disk_avail { + _ if cfg.capacity.is_none() => "N/A".into(), + Some((avail, total)) => { + let pct = (*avail as f64) / (*total as f64) * 100.; + let avail = bytesize::ByteSize::b(*avail); + format!("{} ({:.1}%)", avail, pct) + } + None => "?".into(), + }; healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}", + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}", id = adv.id, host = adv.status.hostname, addr = adv.addr, tags = cfg.tags.join(","), zone = cfg.zone, capacity = cfg.capacity_string(), + data_avail = data_avail, )); } _ => { @@ -191,6 +202,9 @@ pub async fn cmd_admin( AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } + AdminRpc::WorkerVars(wv) => { + print_worker_vars(wv); + } AdminRpc::WorkerInfo(tid, wi) => { print_worker_info(tid, wi); } diff --git a/src/garage/cli/init.rs b/src/garage/cli/init.rs index 511b53a6..20813f1c 100644 --- a/src/garage/cli/init.rs +++ b/src/garage/cli/init.rs @@ -14,11 +14,11 @@ pub fn node_id_command(config_file: PathBuf, quiet: bool) -> Result<(), Error> { garage_rpc::system::read_node_id(&config.metadata_dir).err_context(READ_KEY_ERROR)?; let idstr = if let Some(addr) = config.rpc_public_addr { - let idstr = format!("{}@{}", hex::encode(&node_id), addr); + let idstr = format!("{}@{}", hex::encode(node_id), addr); println!("{}", idstr); idstr } else { - let idstr = hex::encode(&node_id); + let idstr = hex::encode(node_id); println!("{}", idstr); if !quiet { diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index dcb9fef9..986592ae 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -515,6 +515,11 @@ pub struct StatsOpt { /// Gather detailed statistics (this can be long) #[structopt(short = "d", long = "detailed")] pub detailed: bool, + + /// Don't show global cluster stats (internal use in RPC) + #[structopt(skip)] + #[serde(default)] + pub skip_global: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] @@ -528,11 +533,25 @@ pub enum WorkerOperation { /// Get detailed information about a worker #[structopt(name = "info", version = garage_version())] Info { tid: usize }, + /// Get worker parameter + #[structopt(name = "get", version = garage_version())] + Get { + /// Gather variable values from all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable name to get, or none to get all variables + variable: Option<String>, + }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] Set { - #[structopt(subcommand)] - opt: WorkerSetCmd, + /// Set variable values on all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable node to set + variable: String, + /// Value to set the variable to + value: String, }, } @@ -547,19 +566,6 @@ pub struct WorkerListOpt { } #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerSetCmd { - /// Set tranquility of scrub operations - #[structopt(name = "scrub-tranquility", version = garage_version())] - ScrubTranquility { tranquility: u32 }, - /// Set number of concurrent block resync workers - #[structopt(name = "resync-worker-count", version = garage_version())] - ResyncWorkerCount { worker_count: usize }, - /// Set tranquility of block resync operations - #[structopt(name = "resync-tranquility", version = garage_version())] - ResyncTranquility { tranquility: u32 }, -} - -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 63fd9eba..2c6be2f4 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -229,7 +229,7 @@ pub fn find_matching_node( ) -> Result<Uuid, Error> { let mut candidates = vec![]; for c in cand { - if hex::encode(&c).starts_with(&pattern) && !candidates.contains(&c) { + if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) { candidates.push(c); } } @@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } +pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { + let table = wv + .into_iter() + .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) + .collect::<Vec<_>>(); + format_table(table); +} + pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/main.rs b/src/garage/main.rs index cd1d6228..9b069d82 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -28,6 +28,7 @@ use structopt::StructOpt; use netapp::util::parse_and_resolve_peer_addr; use netapp::NetworkKey; +use garage_util::config::Config; use garage_util::error::*; use garage_rpc::system::*; @@ -49,11 +50,10 @@ struct Opt { #[structopt(short = "h", long = "rpc-host", env = "GARAGE_RPC_HOST")] pub rpc_host: Option<String>, - /// RPC secret network key for admin operations - #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] - pub rpc_secret: Option<String>, + #[structopt(flatten)] + pub secrets: Secrets, - /// Configuration file (garage.toml) + /// Path to configuration file #[structopt( short = "c", long = "config", @@ -66,6 +66,24 @@ struct Opt { cmd: Command, } +#[derive(StructOpt, Debug)] +pub struct Secrets { + /// RPC secret network key, used to replace rpc_secret in config.toml when running the + /// daemon or doing admin operations + #[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")] + pub rpc_secret: Option<String>, + + /// Metrics API authentication token, replaces admin.metrics_token in config.toml when + /// running the Garage daemon + #[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")] + pub admin_token: Option<String>, + + /// Metrics API authentication token, replaces admin.metrics_token in config.toml when + /// running the Garage daemon + #[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")] + pub metrics_token: Option<String>, +} + #[tokio::main] async fn main() { // Initialize version and features info @@ -148,9 +166,9 @@ async fn main() { sodiumoxide::init().expect("Unable to init sodiumoxide"); let res = match opt.cmd { - Command::Server => server::run_server(opt.config_file).await, + Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, repair_opt).await + repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::Node(NodeOperation::NodeId(node_id_opt)) => { node_id_command(opt.config_file, node_id_opt.quiet) @@ -165,7 +183,7 @@ async fn main() { } async fn cli_command(opt: Opt) -> Result<(), Error> { - let config = if opt.rpc_secret.is_none() || opt.rpc_host.is_none() { + let config = if opt.secrets.rpc_secret.is_none() || opt.rpc_host.is_none() { Some(garage_util::config::read_config(opt.config_file.clone()) .err_context(format!("Unable to read configuration file {}. Configuration file is needed because -h or -s is not provided on the command line.", opt.config_file.to_string_lossy()))?) } else { @@ -174,9 +192,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { // Find and parse network RPC secret let net_key_hex_str = opt + .secrets .rpc_secret .as_ref() - .or_else(|| config.as_ref().map(|c| &c.rpc_secret)) + .or_else(|| config.as_ref().and_then(|c| c.rpc_secret.as_ref())) .ok_or("No RPC secret provided")?; let network_key = NetworkKey::from_slice( &hex::decode(net_key_hex_str).err_context("Invalid RPC secret key (bad hex)")?[..], @@ -233,3 +252,16 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { Ok(x) => Ok(x), } } + +fn fill_secrets(mut config: Config, secrets: Secrets) -> Config { + if secrets.rpc_secret.is_some() { + config.rpc_secret = secrets.rpc_secret; + } + if secrets.admin_token.is_some() { + config.admin.admin_token = secrets.admin_token; + } + if secrets.metrics_token.is_some() { + config.admin.metrics_token = secrets.metrics_token; + } + config +} diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 25193e4a..f4edcf03 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -6,8 +6,13 @@ use garage_util::error::*; use garage_model::garage::Garage; use crate::cli::structs::*; +use crate::{fill_secrets, Secrets}; -pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Result<(), Error> { +pub async fn offline_repair( + config_file: PathBuf, + secrets: Secrets, + opt: OfflineRepairOpt, +) -> Result<(), Error> { if !opt.yes { return Err(Error::Message( "Please add the --yes flag to launch repair operation".into(), @@ -15,7 +20,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu } info!("Loading configuration..."); - let config = read_config(config_file)?; + let config = fill_secrets(read_config(config_file)?, secrets); info!("Initializing Garage main data store..."); let garage = Garage::new(config)?; diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 627e3bf3..0e14ed51 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -51,7 +51,11 @@ pub async fn launch_online_repair( ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::SetTranquility { tranquility } => { - ScrubWorkerCommand::SetTranquility(tranquility) + garage + .block_manager + .scrub_persister + .set_with(|x| x.tranquility = tranquility)?; + return Ok(()); } }; info!("Sending command to scrub worker: {:?}", cmd); diff --git a/src/garage/server.rs b/src/garage/server.rs index 16f1b625..958089c6 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -17,6 +17,7 @@ use garage_api::k2v::api_server::K2VApiServer; use crate::admin::*; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; +use crate::{fill_secrets, Secrets}; async fn wait_from(mut chan: watch::Receiver<bool>) { while !*chan.borrow() { @@ -26,9 +27,9 @@ async fn wait_from(mut chan: watch::Receiver<bool>) { } } -pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { +pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Error> { info!("Loading configuration..."); - let config = read_config(config_file)?; + let config = fill_secrets(read_config(config_file)?, secrets); // ---- Initialize Garage internals ---- diff --git a/src/garage/tests/admin.rs b/src/garage/tests/admin.rs index 37aefe38..d3becf0a 100644 --- a/src/garage/tests/admin.rs +++ b/src/garage/tests/admin.rs @@ -21,14 +21,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "allow", - "--read", - "--key", - &ctx.garage.key.id, - BCKT_NAME, - ]) + .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); @@ -36,14 +29,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "deny", - "--read", - "--key", - &ctx.garage.key.name, - BCKT_NAME, - ]) + .args(["bucket", "deny", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); @@ -51,14 +37,7 @@ async fn test_admin_bucket_perms() { ctx.garage .command() - .args([ - "bucket", - "allow", - "--read", - "--key", - &ctx.garage.key.name, - BCKT_NAME, - ]) + .args(["bucket", "allow", "--read", "--key", &ctx.key.id, BCKT_NAME]) .quiet() .expect_success_status("Could not create bucket"); diff --git a/src/garage/tests/bucket.rs b/src/garage/tests/bucket.rs index 9c363013..0dec3cfa 100644 --- a/src/garage/tests/bucket.rs +++ b/src/garage/tests/bucket.rs @@ -13,7 +13,7 @@ async fn test_bucket_all() { ctx.garage .command() .args(["key", "deny"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not deny key to create buckets"); @@ -26,7 +26,7 @@ async fn test_bucket_all() { ctx.garage .command() .args(["key", "allow"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not deny key to create buckets"); diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs index 212588b5..e9d4849a 100644 --- a/src/garage/tests/common/client.rs +++ b/src/garage/tests/common/client.rs @@ -1,15 +1,9 @@ use aws_sdk_s3::{Client, Config, Credentials, Endpoint}; -use super::garage::Instance; +use super::garage::{Instance, Key}; -pub fn build_client(instance: &Instance) -> Client { - let credentials = Credentials::new( - &instance.key.id, - &instance.key.secret, - None, - None, - "garage-integ-test", - ); +pub fn build_client(instance: &Instance, key: &Key) -> Client { + let credentials = Credentials::new(&key.id, &key.secret, None, None, "garage-integ-test"); let endpoint = Endpoint::immutable(instance.s3_uri()); let config = Config::builder() diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 1700cc90..609eda97 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -14,6 +14,7 @@ use garage_api::signature; /// You should ever only use this to send requests AWS sdk won't send, /// like to reproduce behavior of unusual implementations found to be /// problematic. +#[derive(Clone)] pub struct CustomRequester { key: Key, uri: Uri, @@ -22,18 +23,18 @@ pub struct CustomRequester { } impl CustomRequester { - pub fn new_s3(instance: &Instance) -> Self { + pub fn new_s3(instance: &Instance, key: &Key) -> Self { CustomRequester { - key: instance.key.clone(), + key: key.clone(), uri: instance.s3_uri(), service: "s3", client: Client::new(), } } - pub fn new_k2v(instance: &Instance) -> Self { + pub fn new_k2v(instance: &Instance, key: &Key) -> Self { CustomRequester { - key: instance.key.clone(), + key: key.clone(), uri: instance.k2v_uri(), service: "k2v", client: Client::new(), diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index dbebe5b1..3beed7c4 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -13,7 +13,7 @@ static GARAGE_TEST_SECRET: &str = #[derive(Debug, Default, Clone)] pub struct Key { - pub name: String, + pub name: Option<String>, pub id: String, pub secret: String, } @@ -21,10 +21,11 @@ pub struct Key { pub struct Instance { process: process::Child, pub path: PathBuf, - pub key: Key, + pub default_key: Key, pub s3_port: u16, pub k2v_port: u16, pub web_port: u16, + pub admin_port: u16, } impl Instance { @@ -101,22 +102,36 @@ api_bind_addr = "127.0.0.1:{admin_port}" Instance { process: child, path, - key: Key::default(), + default_key: Key::default(), s3_port: port, k2v_port: port + 1, web_port: port + 3, + admin_port: port + 4, } } fn setup(&mut self) { - use std::{thread, time::Duration}; - - // Wait for node to be ready - thread::sleep(Duration::from_secs(2)); - + self.wait_for_boot(); self.setup_layout(); + self.default_key = self.key(Some("garage_test")); + } + + fn wait_for_boot(&mut self) { + use std::{thread, time::Duration}; - self.key = self.new_key("garage_test"); + // 60 * 2 seconds = 120 seconds = 2min + for _ in 0..60 { + let termination = self + .command() + .args(["status"]) + .quiet() + .status() + .expect("Unable to run command"); + if termination.success() { + break; + } + thread::sleep(Duration::from_secs(2)); + } } fn setup_layout(&self) { @@ -167,13 +182,17 @@ api_bind_addr = "127.0.0.1:{admin_port}" .expect("Could not build garage endpoint URI") } - pub fn new_key(&self, name: &str) -> Key { + pub fn key(&self, maybe_name: Option<&str>) -> Key { let mut key = Key::default(); - let output = self - .command() - .args(["key", "create", name]) - .expect_success_output("Could not create key"); + let mut cmd = self.command(); + let base = cmd.args(["key", "create"]); + let with_name = match maybe_name { + Some(name) => base.args([name]), + None => base, + }; + + let output = with_name.expect_success_output("Could not create key"); let stdout = String::from_utf8(output.stdout).unwrap(); for line in stdout.lines() { @@ -190,7 +209,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" assert!(!key.secret.is_empty(), "Invalid key: Key secret is empty"); Key { - name: name.to_owned(), + name: maybe_name.map(String::from), ..key } } diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs index 28874b02..eca3e42b 100644 --- a/src/garage/tests/common/mod.rs +++ b/src/garage/tests/common/mod.rs @@ -13,13 +13,16 @@ use custom_requester::CustomRequester; const REGION: Region = Region::from_static("garage-integ-test"); +#[derive(Clone)] pub struct Context { pub garage: &'static garage::Instance, + pub key: garage::Key, pub client: Client, pub custom_request: CustomRequester, pub k2v: K2VContext, } +#[derive(Clone)] pub struct K2VContext { pub request: CustomRequester, } @@ -27,13 +30,15 @@ pub struct K2VContext { impl Context { fn new() -> Self { let garage = garage::instance(); - let client = client::build_client(garage); - let custom_request = CustomRequester::new_s3(garage); - let k2v_request = CustomRequester::new_k2v(garage); + let key = garage.key(None); + let client = client::build_client(garage, &key); + let custom_request = CustomRequester::new_s3(garage, &key); + let k2v_request = CustomRequester::new_k2v(garage, &key); Context { garage, client, + key, custom_request, k2v: K2VContext { request: k2v_request, @@ -57,7 +62,7 @@ impl Context { .args(["bucket", "allow"]) .args(["--owner", "--read", "--write"]) .arg(&bucket_name) - .args(["--key", &self.garage.key.name]) + .args(["--key", &self.key.id]) .quiet() .expect_success_status("Could not allow key for bucket"); diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 6abba1c5..595d0ba8 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -36,12 +37,12 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}, {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}} ]"#, - base64::encode(values.get(&"a").unwrap()), - base64::encode(values.get(&"b").unwrap()), - base64::encode(values.get(&"c").unwrap()), - base64::encode(values.get(&"d.1").unwrap()), - base64::encode(values.get(&"d.2").unwrap()), - base64::encode(values.get(&"e").unwrap()), + BASE64_STANDARD.encode(values.get(&"a").unwrap()), + BASE64_STANDARD.encode(values.get(&"b").unwrap()), + BASE64_STANDARD.encode(values.get(&"c").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2").unwrap()), + BASE64_STANDARD.encode(values.get(&"e").unwrap()), ) .into_bytes(), ) @@ -120,12 +121,12 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -141,10 +142,10 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -160,9 +161,9 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, ], "more": false, "nextStart": null, @@ -178,8 +179,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -195,8 +196,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -212,7 +213,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]} ], "more": true, "nextStart": "b", @@ -228,8 +229,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]} + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]} ], "more": false, "nextStart": null, @@ -255,10 +256,10 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}} ]"#, ct.get(&"b").unwrap(), - base64::encode(values.get(&"c'").unwrap()), + BASE64_STANDARD.encode(values.get(&"c'").unwrap()), ct.get(&"d.1").unwrap(), - base64::encode(values.get(&"d.1'").unwrap()), - base64::encode(values.get(&"d.2'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()), ) .into_bytes(), ) @@ -333,11 +334,11 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -353,8 +354,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -370,7 +371,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -386,7 +387,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": true, "nextStart": "d.2", @@ -402,7 +403,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -418,8 +419,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -435,8 +436,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -452,8 +453,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -563,8 +564,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -580,8 +581,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, ], "more": false, "nextStart": null, @@ -599,10 +600,10 @@ async fn test_batch() { "items": [ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]}, {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]}, {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, ], "more": false, "nextStart": null, diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 2641386f..588836c7 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -3,6 +3,7 @@ use std::time::Duration; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -222,7 +223,10 @@ async fn test_items_and_indices() { let res_json = json_body(res).await; assert_json_eq!( res_json, - [base64::encode(&content2), base64::encode(&content3)] + [ + BASE64_STANDARD.encode(&content2), + BASE64_STANDARD.encode(&content3) + ] ); // ReadIndex -- now there should be some stuff @@ -411,7 +415,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // f2: binary let res = ctx @@ -452,7 +456,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // -- Test with a second, concurrent value -- let res = ctx @@ -488,8 +492,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -512,8 +516,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -550,8 +554,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -587,7 +591,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f1: not specified let res = ctx @@ -612,7 +619,10 @@ async fn test_item_return_format() { .unwrap() .to_string(); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f2: binary let res = ctx @@ -644,7 +654,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // -- Delete everything -- let res = ctx diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index e56705ae..dd44aed9 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,12 +1,17 @@ +use base64::prelude::*; use hyper::{Method, StatusCode}; use std::time::Duration; +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; use crate::common; #[tokio::test] -async fn test_poll() { +async fn test_poll_item() { let ctx = common::context(); - let bucket = ctx.create_bucket("test-k2v-poll"); + let bucket = ctx.create_bucket("test-k2v-poll-item"); // Write initial value let res = ctx @@ -52,8 +57,8 @@ async fn test_poll() { let poll = { let bucket = bucket.clone(); let ct = ct.clone(); + let ctx = ctx.clone(); tokio::spawn(async move { - let ctx = common::context(); ctx.k2v .request .builder(bucket.clone()) @@ -96,3 +101,165 @@ async fn test_poll() { .to_vec(); assert_eq!(poll_res_body, b"New value"); } + +#[tokio::test] +async fn test_poll_range() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll-range"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Initial poll range, retrieve single item and first seen_marker + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(b"{}".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let json_res = json_body(res2).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_json_eq!( + json_res, + json!( + { + "items": [ + {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]}, + ], + "seenMarker": seen_marker, + } + ) + ); + + // Second poll range, which will complete later + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test1")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"New value")]) + ); + + // Now we will add a value on a different key + // Start a new poll operation + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write value on different key + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test2")) + .body(b"Other value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test2")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"Other value")]) + ); +} diff --git a/src/garage/tests/s3/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index 48da7607..b7a1acae 100644 --- a/src/garage/tests/s3/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs @@ -109,7 +109,7 @@ async fn test_create_bucket_streaming() { ctx.garage .command() .args(["key", "allow"]) - .args(["--create-bucket", &ctx.garage.key.id]) + .args(["--create-bucket", &ctx.key.id]) .quiet() .expect_success_output("Could not allow key to create buckets"); diff --git a/src/garage/tests/s3/website.rs b/src/garage/tests/s3/website.rs index 244a2fa0..f61838e4 100644 --- a/src/garage/tests/s3/website.rs +++ b/src/garage/tests/s3/website.rs @@ -1,5 +1,8 @@ use crate::common; use crate::common::ext::*; +use crate::k2v::json_body; + +use assert_json_diff::assert_json_eq; use aws_sdk_s3::{ model::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration}, types::ByteStream, @@ -9,6 +12,7 @@ use hyper::{ body::{to_bytes, Body}, Client, }; +use serde_json::json; const BODY: &[u8; 16] = b"<h1>bonjour</h1>"; const BODY_ERR: &[u8; 6] = b"erreur"; @@ -49,6 +53,31 @@ async fn test_website() { BODY.as_ref() ); /* check that we do not leak body */ + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: Bucket 'my-website' is not authorized for website hosting", + "region": "garage-integ-test", + "path": "/check", + }) + ); + ctx.garage .command() .args(["bucket", "website", "--allow", BCKT_NAME]) @@ -62,6 +91,25 @@ async fn test_website() { BODY.as_ref() ); + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let mut admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::OK); + assert_eq!( + to_bytes(admin_resp.body_mut()).await.unwrap().as_ref(), + format!("Bucket '{BCKT_NAME}' is authorized for website hosting").as_bytes() + ); + ctx.garage .command() .args(["bucket", "website", "--deny", BCKT_NAME]) @@ -74,6 +122,31 @@ async fn test_website() { to_bytes(resp.body_mut()).await.unwrap().as_ref(), BODY.as_ref() ); /* check that we do not leak body */ + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{0}/check?domain={1}", + ctx.garage.admin_port, + BCKT_NAME.to_string() + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: Bucket 'my-website' is not authorized for website hosting", + "region": "garage-integ-test", + "path": "/check", + }) + ); } #[tokio::test] @@ -322,3 +395,103 @@ async fn test_website_s3_api() { ); } } + +#[tokio::test] +async fn test_website_check_website_enabled() { + let ctx = common::context(); + + let client = Client::new(); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port)) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::BAD_REQUEST); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "InvalidRequest", + "message": "Bad request: No domain query string found", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: ", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=foobar", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: foobar", + "region": "garage-integ-test", + "path": "/check", + }) + ); + + let admin_req = || { + Request::builder() + .method("GET") + .uri(format!( + "http://127.0.0.1:{}/check?domain=%E2%98%B9", + ctx.garage.admin_port + )) + .body(Body::empty()) + .unwrap() + }; + + let admin_resp = client.request(admin_req()).await.unwrap(); + assert_eq!(admin_resp.status(), StatusCode::NOT_FOUND); + let res_body = json_body(admin_resp).await; + assert_json_eq!( + res_body, + json!({ + "code": "NoSuchBucket", + "message": "Bucket not found: ☹", + "region": "garage-integ-test", + "path": "/check", + }) + ); +} |