diff options
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 19 | ||||
-rw-r--r-- | src/garage/admin/block.rs | 235 | ||||
-rw-r--r-- | src/garage/admin/bucket.rs | 53 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 545 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 60 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 15 | ||||
-rw-r--r-- | src/garage/cli/mod.rs | 11 | ||||
-rw-r--r-- | src/garage/cli/repair.rs (renamed from src/garage/repair/offline.rs) | 0 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 64 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 216 | ||||
-rw-r--r-- | src/garage/cli_v2/block.rs | 145 | ||||
-rw-r--r-- | src/garage/cli_v2/bucket.rs | 48 | ||||
-rw-r--r-- | src/garage/cli_v2/cluster.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/key.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/layout.rs | 2 | ||||
-rw-r--r-- | src/garage/cli_v2/mod.rs | 104 | ||||
-rw-r--r-- | src/garage/cli_v2/node.rs | 113 | ||||
-rw-r--r-- | src/garage/cli_v2/worker.rs | 213 | ||||
-rw-r--r-- | src/garage/main.rs | 17 | ||||
-rw-r--r-- | src/garage/repair/mod.rs | 2 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 390 | ||||
-rw-r--r-- | src/garage/server.rs | 11 | ||||
-rw-r--r-- | src/garage/tests/common/custom_requester.rs | 2 |
23 files changed, 628 insertions, 1641 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 483e33c0..c566c3e0 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -23,7 +23,10 @@ path = "tests/lib.rs" [dependencies] format_table.workspace = true garage_db.workspace = true -garage_api.workspace = true +garage_api_admin.workspace = true +garage_api_s3.workspace = true +garage_api_k2v = { workspace = true, optional = true } +garage_api_common.workspace = true garage_block.workspace = true garage_model.workspace = true garage_net.workspace = true @@ -40,29 +43,23 @@ parse_duration.workspace = true hex.workspace = true tracing.workspace = true tracing-subscriber.workspace = true -rand.workspace = true async-trait.workspace = true sha1.workspace = true sodiumoxide.workspace = true structopt.workspace = true git-version.workspace = true -serde.workspace = true -serde_bytes.workspace = true -toml.workspace = true - futures.workspace = true -futures-util.workspace = true tokio.workspace = true opentelemetry.workspace = true opentelemetry-prometheus = { workspace = true, optional = true } opentelemetry-otlp = { workspace = true, optional = true } -prometheus = { workspace = true, optional = true } syslog-tracing = { workspace = true, optional = true } [dev-dependencies] -aws-config.workspace = true +garage_api_common.workspace = true + aws-sdk-s3.workspace = true chrono.workspace = true http.workspace = true @@ -84,7 +81,7 @@ k2v-client.workspace = true [features] default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ] -k2v = [ "garage_util/k2v", "garage_api/k2v" ] +k2v = [ "garage_util/k2v", "garage_api_k2v" ] # Database engines lmdb = [ "garage_model/lmdb" ] @@ -95,7 +92,7 @@ consul-discovery = [ "garage_rpc/consul-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). -metrics = [ "garage_api/metrics", "opentelemetry-prometheus", "prometheus" ] +metrics = [ "garage_api_admin/metrics", "opentelemetry-prometheus" ] # Exporter for the OpenTelemetry Collector. telemetry-otlp = [ "opentelemetry-otlp" ] # Logging to syslog diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs deleted file mode 100644 index edeb88c0..00000000 --- a/src/garage/admin/block.rs +++ /dev/null @@ -1,235 +0,0 @@ -use garage_util::data::*; - -use garage_table::*; - -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; - -use crate::cli::*; - -use super::*; - -impl AdminRpcHandler { - pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> { - match cmd { - BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( - self.garage.block_manager.list_resync_errors()?, - )), - BlockOperation::Info { hash } => self.handle_block_info(hash).await, - BlockOperation::RetryNow { all, blocks } => { - self.handle_block_retry_now(*all, blocks).await - } - BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await, - } - } - - async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> { - let hash = self.find_block_hash_by_prefix(hash)?; - let refcount = self.garage.block_manager.get_block_rc(&hash)?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - let mut versions = vec![]; - let mut uploads = vec![]; - for br in block_refs { - if let Some(v) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { - if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { - uploads.push(u); - } - } - versions.push(Ok(v)); - } else { - versions.push(Err(br.version)); - } - } - Ok(AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - }) - } - - async fn handle_block_retry_now( - &self, - all: bool, - blocks: &[String], - ) -> Result<AdminRpc, Error> { - if all { - if !blocks.is_empty() { - return Err(Error::BadRequest( - "--all was specified, cannot also specify blocks".into(), - )); - } - let blocks = self.garage.block_manager.list_resync_errors()?; - for b in blocks.iter() { - self.garage.block_manager.resync.clear_backoff(&b.hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } else { - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - self.garage.block_manager.resync.clear_backoff(&hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } - } - - async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> { - if !yes { - return Err(Error::BadRequest( - "Pass the --yes flag to confirm block purge operation.".into(), - )); - } - - let mut obj_dels = 0; - let mut mpu_dels = 0; - let mut ver_dels = 0; - - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - - for br in block_refs { - if let Some(version) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - self.handle_block_purge_version_backlink( - &version, - &mut obj_dels, - &mut mpu_dels, - ) - .await?; - - if !version.deleted.get() { - let deleted_version = Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } - } - - Ok(AdminRpc::Ok(format!( - "Purged {} blocks, {} versions, {} objects, {} multipart uploads", - blocks.len(), - ver_dels, - obj_dels, - mpu_dels, - ))) - } - - async fn handle_block_purge_version_backlink( - &self, - version: &Version, - obj_dels: &mut usize, - mpu_dels: &mut usize, - ) -> Result<(), Error> { - let (bucket_id, key, ov_id) = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), - VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { - if !mpu.deleted.get() { - mpu.parts.clear(); - mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; - *mpu_dels += 1; - } - (mpu.bucket_id, mpu.key.clone(), *upload_id) - } else { - return Ok(()); - } - } - }; - - if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == ov_id { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - bucket_id, - key, - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - *obj_dels += 1; - } - } - } - - Ok(()) - } - - // ---- helper function ---- - fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> { - if prefix.len() < 4 { - return Err(Error::BadRequest( - "Please specify at least 4 characters of the block hash".into(), - )); - } - - let prefix_bin = - hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; - - let iter = self - .garage - .block_ref_table - .data - .store - .range(&prefix_bin[..]..) - .map_err(GarageError::from)?; - let mut found = None; - for item in iter { - let (k, _v) = item.map_err(GarageError::from)?; - let hash = Hash::try_from(&k[..32]).unwrap(); - if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { - break; - } - if hex::encode(hash.as_slice()).starts_with(prefix) { - match &found { - Some(x) if *x == hash => (), - Some(_) => { - return Err(Error::BadRequest(format!( - "Several blocks match prefix `{}`", - prefix - ))); - } - None => { - found = Some(hash); - } - } - } - } - - found.ok_or_else(|| Error::BadRequest("No matching block found".into())) - } -} diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs deleted file mode 100644 index 26d54084..00000000 --- a/src/garage/admin/bucket.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::fmt::Write; - -use garage_model::helper::error::{Error, OkOrBadRequest}; - -use crate::cli::*; - -use super::*; - -impl AdminRpcHandler { - pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> { - match cmd { - BucketOperation::CleanupIncompleteUploads(query) => { - self.handle_bucket_cleanup_incomplete_uploads(query).await - } - _ => unreachable!(), - } - } - - async fn handle_bucket_cleanup_incomplete_uploads( - &self, - query: &CleanupIncompleteUploadsOpt, - ) -> Result<AdminRpc, Error> { - let mut bucket_ids = vec![]; - for b in query.buckets.iter() { - bucket_ids.push( - self.garage - .bucket_helper() - .admin_get_existing_matching_bucket(b) - .await?, - ); - } - - let duration = parse_duration::parse::parse(&query.older_than) - .ok_or_bad_request("Invalid duration passed for --older-than parameter")?; - - let mut ret = String::new(); - for bucket in bucket_ids { - let count = self - .garage - .bucket_helper() - .cleanup_incomplete_uploads(&bucket, duration) - .await?; - writeln!( - &mut ret, - "Bucket {:?}: {} incomplete uploads aborted", - bucket, count - ) - .unwrap(); - } - - Ok(AdminRpc::Ok(ret)) - } -} diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index 1888a208..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,545 +0,0 @@ -mod block; -mod bucket; - -use std::collections::HashMap; -use std::fmt::Write; -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use format_table::format_table_to_string; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error as GarageError; - -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::layout::PARTITION_BITS; -use garage_rpc::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::Version; - -use garage_api::admin::api::{AdminApiRequest, TaggedAdminApiResponse}; -use garage_api::admin::EndpointHandler as AdminApiEndpoint; -use garage_api::generic_server::ApiError; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - LaunchRepair(RepairOpt), - Stats(StatsOpt), - Worker(WorkerOperation), - BlockOperation(BlockOperation), - MetaOperation(MetaOperation), - - // Replies - Ok(String), - WorkerList( - HashMap<usize, garage_util::background::WorkerInfo>, - WorkerListOpt, - ), - WorkerVars(Vec<(Uuid, String, String)>), - WorkerInfo(usize, garage_util::background::WorkerInfo), - BlockErrorList(Vec<BlockResyncErrorInfo>), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec<Result<Version, Uuid>>, - uploads: Vec<MultipartUpload>, - }, - - // Proxying HTTP Admin API endpoints - ApiRequest(AdminApiRequest), - ApiOkResponse(TaggedAdminApiResponse), - ApiErrorResponse { - http_code: u16, - error_code: String, - message: String, - }, -} - -impl Rpc for AdminRpc { - type Response = Result<AdminRpc, Error>; -} - -pub struct AdminRpcHandler { - garage: Arc<Garage>, - background: Arc<BackgroundRunner>, - endpoint: Arc<Endpoint<AdminRpc, Self>>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> { - if opt.all_nodes { - let mut ret = String::new(); - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - - for node in all_nodes.iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table)?); - table.push(self.gather_table_stats(&self.garage.key_table)?); - table.push(self.gather_table_stats(&self.garage.object_table)?); - table.push(self.gather_table_stats(&self.garage.version_table)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics for current nodes - let layout = &self.garage.system.cluster_layout(); - let mut node_partition_count = HashMap::<Uuid, u64>::new(); - for short_id in layout.current().ring_assignment_data.iter() { - let id = layout.current().node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::<HashMap<_, _>>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::<Vec<_>>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::<Vec<_>>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error> - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } - - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option<String>, - ) -> Result<AdminRpc, Error> { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result<AdminRpc, Error> { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) - } - } - - // ================ META DB COMMANDS ==================== - - async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> { - match mo { - MetaOperation::Snapshot { all: true } => { - let to = self.garage.system.cluster_layout().all_nodes().to_vec(); - - let resps = futures::future::join_all(to.iter().map(|to| async move { - let to = (*to).into(); - self.endpoint - .call( - &to, - AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), - PRIO_NORMAL, - ) - .await - })) - .await; - - let mut ret = vec![]; - for (to, resp) in to.iter().zip(resps.iter()) { - let res_str = match resp { - Ok(_) => "ok".to_string(), - Err(e) => format!("error: {}", e), - }; - ret.push(format!("{:?}\t{}", to, res_str)); - } - - Ok(AdminRpc::Ok(format_table_to_string(ret))) - } - MetaOperation::Snapshot { all: false } => { - garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; - Ok(AdminRpc::Ok("Snapshot has been saved.".into())) - } - } - } - - // ================== PROXYING ADMIN API REQUESTS =================== - - async fn handle_api_request( - self: &Arc<Self>, - req: &AdminApiRequest, - ) -> Result<AdminRpc, Error> { - let req = req.clone(); - info!("Proxied admin API request: {}", req.name()); - let res = req.handle(&self.garage).await; - match res { - Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())), - Err(e) => Ok(AdminRpc::ApiErrorResponse { - http_code: e.http_status_code().as_u16(), - error_code: e.code().to_string(), - message: e.to_string(), - }), - } - } -} - -#[async_trait] -impl EndpointHandler<AdminRpc> for AdminRpcHandler { - async fn handle( - self: &Arc<Self>, - message: &AdminRpc, - _from: NodeID, - ) -> Result<AdminRpc, Error> { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - AdminRpc::ApiRequest(r) => self.handle_api_request(r).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } -} diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs deleted file mode 100644 index a6540c65..00000000 --- a/src/garage/cli/cmd.rs +++ /dev/null @@ -1,60 +0,0 @@ -use garage_util::error::*; - -use garage_rpc::system::*; -use garage_rpc::*; - -use garage_model::helper::error::Error as HelperError; - -use crate::admin::*; -use crate::cli::*; - -pub async fn cmd_admin( - rpc_cli: &Endpoint<AdminRpc, ()>, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - AdminRpc::WorkerList(wi, wlo) => { - print_worker_list(wi, wlo); - } - AdminRpc::WorkerVars(wv) => { - print_worker_vars(wv); - } - AdminRpc::WorkerInfo(tid, wi) => { - print_worker_info(tid, wi); - } - AdminRpc::BlockErrorList(el) => { - print_block_error_list(el); - } - AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - } => { - print_block_info(hash, refcount, versions, uploads); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} - -// ---- utility ---- - -pub async fn fetch_status( - rpc_cli: &Endpoint<SystemRpc, ()>, - rpc_host: NodeID, -) -> Result<Vec<KnownNodeInfo>, Error> { - match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), - resp => Err(Error::unexpected_rpc_message(resp)), - } -} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index bb81d144..bb77cc2a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -7,7 +7,7 @@ use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; -use crate::cli::*; +use crate::cli::structs::*; pub async fn cmd_show_layout( rpc_cli: &Endpoint<SystemRpc, ()>, @@ -260,6 +260,19 @@ pub async fn cmd_layout_skip_dead_nodes( // --- utility --- +pub async fn fetch_status( + rpc_cli: &Endpoint<SystemRpc, ()>, + rpc_host: NodeID, +) -> Result<Vec<KnownNodeInfo>, Error> { + match rpc_cli + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), + resp => Err(Error::unexpected_rpc_message(resp)), + } +} + pub async fn fetch_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index 30f566e2..e007808b 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,12 +1,7 @@ -pub(crate) mod cmd; -pub(crate) mod init; -pub(crate) mod layout; pub(crate) mod structs; -pub(crate) mod util; pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; -pub(crate) use cmd::*; -pub(crate) use init::*; -pub(crate) use structs::*; -pub(crate) use util::*; +pub(crate) mod layout; diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs index 45024e71..45024e71 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/cli/repair.rs diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4ec35e68..c6471515 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use structopt::StructOpt; use garage_util::version::garage_version; @@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt { pub(crate) allow_missing_data: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum BucketOperation { /// List buckets #[structopt(name = "list", version = garage_version())] @@ -237,7 +236,7 @@ pub enum BucketOperation { CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct WebsiteOpt { /// Create #[structopt(long = "allow")] @@ -259,13 +258,13 @@ pub struct WebsiteOpt { pub error_document: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct BucketOpt { /// Bucket name pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct DeleteBucketOpt { /// Bucket name pub name: String, @@ -275,7 +274,7 @@ pub struct DeleteBucketOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct AliasBucketOpt { /// Existing bucket name (its alias in global namespace or its full hex uuid) pub existing_bucket: String, @@ -288,7 +287,7 @@ pub struct AliasBucketOpt { pub local: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct UnaliasBucketOpt { /// Bucket name pub name: String, @@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt { pub local: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct PermBucketOpt { /// Access key name or ID #[structopt(long = "key")] @@ -321,7 +320,7 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct SetQuotasOpt { /// Bucket name pub bucket: String, @@ -336,7 +335,7 @@ pub struct SetQuotasOpt { pub max_objects: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct CleanupIncompleteUploadsOpt { /// Abort multipart uploads older than this value #[structopt(long = "older-than", default_value = "1d")] @@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt { pub buckets: Vec<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] @@ -382,7 +381,7 @@ pub enum KeyOperation { Import(KeyImportOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyInfoOpt { /// ID or name of the key pub key_pattern: String, @@ -391,14 +390,14 @@ pub struct KeyInfoOpt { pub show_secret: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyNewOpt { /// Name of the key #[structopt(default_value = "Unnamed key")] pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyRenameOpt { /// ID or name of the key pub key_pattern: String, @@ -407,7 +406,7 @@ pub struct KeyRenameOpt { pub new_name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyDeleteOpt { /// ID or name of the key pub key_pattern: String, @@ -417,7 +416,7 @@ pub struct KeyDeleteOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyPermOpt { /// ID or name of the key pub key_pattern: String, @@ -427,7 +426,7 @@ pub struct KeyPermOpt { pub create_bucket: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyImportOpt { /// Access key ID pub key_id: String, @@ -444,7 +443,7 @@ pub struct KeyImportOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes #[structopt(short = "a", long = "all-nodes")] @@ -458,7 +457,7 @@ pub struct RepairOpt { pub what: RepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] @@ -489,7 +488,7 @@ pub enum RepairWhat { Rebalance, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum ScrubCmd { /// Start scrub #[structopt(name = "start", version = garage_version())] @@ -503,15 +502,9 @@ pub enum ScrubCmd { /// Cancel scrub in progress #[structopt(name = "cancel", version = garage_version())] Cancel, - /// Set tranquility level for in-progress and future scrubs - #[structopt(name = "set-tranquility", version = garage_version())] - SetTranquility { - #[structopt()] - tranquility: u32, - }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct OfflineRepairOpt { /// Confirm the launch of the repair operation #[structopt(long = "yes")] @@ -521,7 +514,7 @@ pub struct OfflineRepairOpt { pub what: OfflineRepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum OfflineRepairWhat { /// Repair K2V item counters #[cfg(feature = "k2v")] @@ -532,19 +525,14 @@ pub enum OfflineRepairWhat { ObjectCounters, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] pub all_nodes: bool, - - /// Don't show global cluster stats (internal use in RPC) - #[structopt(skip)] - #[serde(default)] - pub skip_global: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] @@ -577,7 +565,7 @@ pub enum WorkerOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub struct WorkerListOpt { /// Show only busy workers #[structopt(short = "b", long = "busy")] @@ -587,7 +575,7 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] @@ -619,7 +607,7 @@ pub enum BlockOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub enum MetaOperation { /// Save a snapshot of the metadata db file #[structopt(name = "snapshot", version = garage_version())] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs deleted file mode 100644 index a3a1480e..00000000 --- a/src/garage/cli/util.rs +++ /dev/null @@ -1,216 +0,0 @@ -use std::collections::HashMap; -use std::time::Duration; - -use format_table::format_table; -use garage_util::background::*; -use garage_util::data::*; -use garage_util::time::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::*; - -use crate::cli::structs::WorkerListOpt; - -pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { - let mut wi = wi.into_iter().collect::<Vec<_>>(); - wi.sort_by_key(|(tid, info)| { - ( - match info.state { - WorkerState::Busy | WorkerState::Throttled(_) => 0, - WorkerState::Idle => 1, - WorkerState::Done => 2, - }, - *tid, - ) - }); - - let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; - for (tid, info) in wi.iter() { - if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { - continue; - } - if wlo.errors && info.errors == 0 { - continue; - } - - let tf = timeago::Formatter::new(); - let err_ago = info - .last_error - .as_ref() - .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) - .unwrap_or_default(); - let (total_err, consec_err) = if info.errors > 0 { - (info.errors.to_string(), info.consecutive_errors.to_string()) - } else { - ("-".into(), "-".into()) - }; - - table.push(format!( - "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", - tid, - info.state, - info.name, - info.status - .tranquility - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - info.status.progress.as_deref().unwrap_or("-"), - info.status - .queue_length - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - total_err, - consec_err, - err_ago, - )); - } - format_table(table); -} - -pub fn print_worker_info(tid: usize, info: WorkerInfo) { - let mut table = vec![]; - table.push(format!("Task id:\t{}", tid)); - table.push(format!("Worker name:\t{}", info.name)); - match info.state { - WorkerState::Throttled(t) => { - table.push(format!( - "Worker state:\tBusy (throttled, paused for {:.3}s)", - t - )); - } - s => { - table.push(format!("Worker state:\t{}", s)); - } - }; - if let Some(tql) = info.status.tranquility { - table.push(format!("Tranquility:\t{}", tql)); - } - - table.push("".into()); - table.push(format!("Total errors:\t{}", info.errors)); - table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); - if let Some((s, t)) = info.last_error { - table.push(format!("Last error:\t{}", s)); - let tf = timeago::Formatter::new(); - table.push(format!( - "Last error time:\t{}", - tf.convert(Duration::from_millis(now_msec() - t)) - )); - } - - table.push("".into()); - if let Some(p) = info.status.progress { - table.push(format!("Progress:\t{}", p)); - } - if let Some(ql) = info.status.queue_length { - table.push(format!("Queue length:\t{}", ql)); - } - if let Some(pe) = info.status.persistent_errors { - table.push(format!("Persistent errors:\t{}", pe)); - } - - for (i, s) in info.status.freeform.iter().enumerate() { - if i == 0 { - if table.last() != Some(&"".into()) { - table.push("".into()); - } - table.push(format!("Message:\t{}", s)); - } else { - table.push(format!("\t{}", s)); - } - } - format_table(table); -} - -pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { - let table = wv - .into_iter() - .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) - .collect::<Vec<_>>(); - format_table(table); -} - -pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { - let now = now_msec(); - let tf = timeago::Formatter::new(); - let mut tf2 = timeago::Formatter::new(); - tf2.ago(""); - - let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; - for e in el { - let next_try = if e.next_try > now { - tf2.convert(Duration::from_millis(e.next_try - now)) - } else { - "asap".to_string() - }; - table.push(format!( - "{}\t{}\t{}\t{}\tin {}", - hex::encode(e.hash.as_slice()), - e.refcount, - e.error_count, - tf.convert(Duration::from_millis(now - e.last_try)), - next_try - )); - } - format_table(table); -} - -pub fn print_block_info( - hash: Hash, - refcount: u64, - versions: Vec<Result<Version, Uuid>>, - uploads: Vec<MultipartUpload>, -) { - println!("Block hash: {}", hex::encode(hash.as_slice())); - println!("Refcount: {}", refcount); - println!(); - - let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; - let mut nondeleted_count = 0; - for v in versions.iter() { - match v { - Ok(ver) => { - match &ver.backlink { - VersionBacklink::Object { bucket_id, key } => { - table.push(format!( - "{:?}\t{:?}\t{}\t\t{:?}", - ver.uuid, - bucket_id, - key, - ver.deleted.get() - )); - } - VersionBacklink::MultipartUpload { upload_id } => { - let upload = uploads.iter().find(|x| x.upload_id == *upload_id); - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}\t{:?}", - ver.uuid, - upload.map(|u| u.bucket_id).unwrap_or_default(), - upload.map(|u| u.key.as_str()).unwrap_or_default(), - upload_id, - ver.deleted.get() - )); - } - } - if !ver.deleted.get() { - nondeleted_count += 1; - } - } - Err(vh) => { - table.push(format!("{:?}\t\t\t\tyes", vh)); - } - } - } - format_table(table); - - if refcount != nondeleted_count { - println!(); - println!( - "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." - ); - } -} diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs new file mode 100644 index 00000000..bfc0db4a --- /dev/null +++ b/src/garage/cli_v2/block.rs @@ -0,0 +1,145 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { + match cmd { + BlockOperation::ListErrors => self.cmd_list_block_errors().await, + BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await, + BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await, + BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await, + } + } + + pub async fn cmd_list_block_errors(&self) -> Result<(), Error> { + let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0; + + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in errors { + let next_try = if e.next_try_in_secs > 0 { + tf2.convert(Duration::from_secs(e.next_try_in_secs)) + } else { + "asap".to_string() + }; + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + e.block_hash, + e.refcount, + e.error_count, + tf.convert(Duration::from_secs(e.last_try_secs_ago)), + next_try + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetBlockInfoRequest { block_hash: hash }) + .await?; + + println!("Block hash: {}", info.block_hash); + println!("Refcount: {}", info.refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; + let mut nondeleted_count = 0; + for ver in info.versions.iter() { + match &ver.backlink { + Some(BlockVersionBacklink::Object { bucket_id, key }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t\t{:?}", + ver.version_id, bucket_id, key, ver.deleted + )); + } + Some(BlockVersionBacklink::Upload { + upload_id, + upload_deleted: _, + upload_garbage_collected: _, + bucket_id, + key, + }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}", + ver.version_id, + bucket_id.as_deref().unwrap_or(""), + key.as_deref().unwrap_or(""), + upload_id, + ver.deleted + )); + } + None => { + table.push(format!("{:.16}\t\t\tyes", ver.version_id)); + } + } + if !ver.deleted { + nondeleted_count += 1; + } + } + format_table(table); + + if info.refcount != nondeleted_count { + println!(); + println!( + "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." + ); + } + + Ok(()) + } + + pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> { + let req = match (all, blocks.len()) { + (true, 0) => LocalRetryBlockResyncRequest::All { all: true }, + (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks { + block_hashes: blocks, + }, + _ => { + return Err(Error::Message( + "Please specify block hashes or --all (not both)".into(), + )) + } + }; + + let res = self.local_api_request(req).await?; + + println!( + "{} blocks returned in queue for a retry now (check logs to see results)", + res.count + ); + + Ok(()) + } + + pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> { + if !yes { + return Err(Error::Message( + "Pass the --yes flag to confirm block purge operation.".into(), + )); + } + + let res = self + .local_api_request(LocalPurgeBlocksRequest(blocks)) + .await?; + + println!( + "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads", + res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted, + ); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs index 837ce783..c25c2c3e 100644 --- a/src/garage/cli_v2/bucket.rs +++ b/src/garage/cli_v2/bucket.rs @@ -3,9 +3,8 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; -use crate::cli as cli_v1; use crate::cli::structs::*; use crate::cli_v2::*; @@ -22,15 +21,9 @@ impl Cli { BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await, BucketOperation::Website(query) => self.cmd_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await, - - // TODO - x => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BucketOperation(x), - ) - .await - .ok_or_message("old error"), + BucketOperation::CleanupIncompleteUploads(query) => { + self.cmd_cleanup_incomplete_uploads(query).await + } } } @@ -520,4 +513,37 @@ impl Cli { Ok(()) } + + pub async fn cmd_cleanup_incomplete_uploads( + &self, + opt: CleanupIncompleteUploadsOpt, + ) -> Result<(), Error> { + let older_than = parse_duration::parse::parse(&opt.older_than) + .ok_or_message("Invalid duration passed for --older-than parameter")?; + + for b in opt.buckets.iter() { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(b.clone()), + }) + .await?; + + let res = self + .api_request(CleanupIncompleteUploadsRequest { + bucket_id: bucket.id.clone(), + older_than_secs: older_than.as_secs(), + }) + .await?; + + if res.uploads_deleted > 0 { + println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted); + } else { + println!("{:.16}: no uploads deleted", bucket.id); + } + } + + Ok(()) + } } diff --git a/src/garage/cli_v2/cluster.rs b/src/garage/cli_v2/cluster.rs index 34a28674..6eb65d12 100644 --- a/src/garage/cli_v2/cluster.rs +++ b/src/garage/cli_v2/cluster.rs @@ -2,7 +2,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::structs::*; use crate::cli_v2::layout::*; diff --git a/src/garage/cli_v2/key.rs b/src/garage/cli_v2/key.rs index ff403a9a..b956906d 100644 --- a/src/garage/cli_v2/key.rs +++ b/src/garage/cli_v2/key.rs @@ -2,7 +2,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::structs::*; use crate::cli_v2::*; diff --git a/src/garage/cli_v2/layout.rs b/src/garage/cli_v2/layout.rs index d44771c7..2f14b332 100644 --- a/src/garage/cli_v2/layout.rs +++ b/src/garage/cli_v2/layout.rs @@ -3,7 +3,7 @@ use format_table::format_table; use garage_util::error::*; -use garage_api::admin::api::*; +use garage_api_admin::api::*; use crate::cli::layout as cli_v1; use crate::cli::structs::*; diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 692b7c1c..28c7c824 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,10 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod block; +pub mod node; +pub mod worker; + use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; @@ -12,17 +16,15 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use garage_api::admin::api::*; -use garage_api::admin::EndpointHandler as AdminApiEndpoint; +use garage_api_admin::api::*; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; +use garage_api_admin::RequestHandler; -use crate::admin::*; -use crate::cli as cli_v1; use crate::cli::structs::*; -use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>, - pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>, + pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>, pub rpc_host: NodeID, } @@ -36,63 +38,35 @@ impl Cli { Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, - - // TODO - Command::Repair(ro) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::LaunchRepair(ro), - ) - .await - .ok_or_message("cli_v1"), - Command::Stats(so) => { - cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) - .await - .ok_or_message("cli_v1") - } - Command::Worker(wo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), - ) - .await - .ok_or_message("cli_v1"), - Command::Block(bo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BlockOperation(bo), - ) - .await - .ok_or_message("cli_v1"), - Command::Meta(mo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::MetaOperation(mo), - ) - .await - .ok_or_message("cli_v1"), + Command::Worker(wo) => self.cmd_worker(wo).await, + Command::Block(bo) => self.cmd_block(bo).await, + Command::Meta(mo) => self.cmd_meta(mo).await, + Command::Stats(so) => self.cmd_stats(so).await, + Command::Repair(ro) => self.cmd_repair(ro).await, _ => unreachable!(), } } - pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error> + pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error> where - T: AdminApiEndpoint, + T: RequestHandler, AdminApiRequest: From<T>, - <T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>, + <T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>, { let req = AdminApiRequest::from(req); let req_name = req.name(); match self - .admin_rpc_endpoint - .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL) - .await? - .ok_or_message("rpc")? + .proxy_rpc_endpoint + .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL) + .await?? { - AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp) - .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))), - AdminRpc::ApiErrorResponse { + ProxyRpcResponse::ProxyApiOkResponse(resp) => { + <T as RequestHandler>::Response::try_from(resp).map_err(|_| { + Error::Message(format!("{} returned unexpected response", req_name)) + }) + } + ProxyRpcResponse::ApiErrorResponse { http_code, error_code, message, @@ -103,4 +77,32 @@ impl Cli { m => Err(Error::unexpected_rpc_message(m)), } } + + pub async fn local_api_request<T>( + &self, + req: T, + ) -> Result<<T as RequestHandler>::Response, Error> + where + T: RequestHandler, + MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>, + AdminApiRequest: From<MultiRequest<T>>, + <MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>, + { + let req = MultiRequest { + node: hex::encode(self.rpc_host), + body: req, + }; + let resp = self.api_request(req).await?; + + if let Some((_, e)) = resp.error.into_iter().next() { + return Err(Error::Message(e)); + } + if resp.success.len() != 1 { + return Err(Error::Message(format!( + "{} responses returned, expected 1", + resp.success.len() + ))); + } + Ok(resp.success.into_iter().next().unwrap().1) + } } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs new file mode 100644 index 00000000..c5d0cdea --- /dev/null +++ b/src/garage/cli_v2/node.rs @@ -0,0 +1,113 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> { + let MetaOperation::Snapshot { all } = cmd; + + let res = self + .api_request(CreateMetadataSnapshotRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalCreateMetadataSnapshotRequest, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tSnapshot created", node)); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> { + let res = self + .api_request(GetNodeStatisticsRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetNodeStatisticsRequest, + }) + .await?; + + for (node, res) in res.success.iter() { + println!("======================"); + println!("Stats for node {:.16}:\n", node); + println!("{}\n", res.freeform); + } + + for (node, err) in res.error.iter() { + println!("======================"); + println!("Node {:.16}: error: {}\n", node, err); + } + + let res = self.api_request(GetClusterStatisticsRequest).await?; + println!("======================"); + println!("Cluster statistics:\n"); + println!("{}\n", res.freeform); + + Ok(()) + } + + pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> { + if !cmd.yes { + return Err(Error::Message( + "Please add --yes to start the repair operation".into(), + )); + } + + let repair_type = match cmd.what { + RepairWhat::Tables => RepairType::Tables, + RepairWhat::Blocks => RepairType::Blocks, + RepairWhat::Versions => RepairType::Versions, + RepairWhat::MultipartUploads => RepairType::MultipartUploads, + RepairWhat::BlockRefs => RepairType::BlockRefs, + RepairWhat::BlockRc => RepairType::BlockRc, + RepairWhat::Rebalance => RepairType::Rebalance, + RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd { + ScrubCmd::Start => ScrubCommand::Start, + ScrubCmd::Cancel => ScrubCommand::Cancel, + ScrubCmd::Pause => ScrubCommand::Pause, + ScrubCmd::Resume => ScrubCommand::Resume, + }), + }; + + let res = self + .api_request(LaunchRepairOperationRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalLaunchRepairOperationRequest { repair_type }, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tRepair launched", node)); + } + format_table(table); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs new file mode 100644 index 00000000..9c248a39 --- /dev/null +++ b/src/garage/cli_v2/worker.rs @@ -0,0 +1,213 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { + match cmd { + WorkerOperation::List { opt } => self.cmd_list_workers(opt).await, + WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await, + WorkerOperation::Get { + all_nodes, + variable, + } => self.cmd_get_var(all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.cmd_set_var(all_nodes, variable, value).await, + } + } + + pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> { + let mut list = self + .local_api_request(LocalListWorkersRequest { + busy_only: opt.busy, + error_only: opt.errors, + }) + .await? + .0; + + list.sort_by_key(|info| { + ( + match info.state { + WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0, + WorkerStateResp::Idle => 1, + WorkerStateResp::Done => 2, + }, + info.id, + ) + }); + + let mut table = + vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; + let tf = timeago::Formatter::new(); + for info in list.iter() { + let err_ago = info + .last_error + .as_ref() + .map(|x| tf.convert(Duration::from_secs(x.secs_ago))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + info.id, + format_worker_state(&info.state), + info.name, + info.tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.progress.as_deref().unwrap_or("-"), + info.queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 }) + .await? + .0; + + let mut table = vec![]; + table.push(format!("Task id:\t{}", info.id)); + table.push(format!("Worker name:\t{}", info.name)); + match &info.state { + WorkerStateResp::Throttled { duration_secs } => { + table.push(format!( + "Worker state:\tBusy (throttled, paused for {:.3}s)", + duration_secs + )); + } + s => { + table.push(format!("Worker state:\t{}", format_worker_state(s))); + } + }; + if let Some(tql) = info.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some(err) = info.last_error { + table.push(format!("Last error:\t{}", err.message)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_secs(err.secs_ago)) + )); + } + + table.push("".into()); + if let Some(p) = info.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); + } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); + } + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> { + let res = self + .api_request(GetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetWorkerVariableRequest { variable: var }, + }) + .await?; + + let mut table = vec![]; + for (node, vars) in res.success.iter() { + for (key, val) in vars.0.iter() { + table.push(format!("{:.16}\t{}\t{}", node, key, val)); + } + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } + + pub async fn cmd_set_var( + &self, + all: bool, + variable: String, + value: String, + ) -> Result<(), Error> { + let res = self + .api_request(SetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalSetWorkerVariableRequest { variable, value }, + }) + .await?; + + let mut table = vec![]; + for (node, kv) in res.success.iter() { + table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value)); + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } +} + +fn format_worker_state(s: &WorkerStateResp) -> &'static str { + match s { + WorkerStateResp::Busy => "Busy", + WorkerStateResp::Throttled { .. } => "Busy*", + WorkerStateResp::Idle => "Idle", + WorkerStateResp::Done => "Done", + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 08c7cee7..2a88d760 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,10 +4,8 @@ #[macro_use] extern crate tracing; -mod admin; mod cli; mod cli_v2; -mod repair; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -35,8 +33,9 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use admin::*; -use cli::*; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH}; + +use cli::structs::*; use secrets::Secrets; #[derive(StructOpt, Debug)] @@ -144,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { cli::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - node_id_command(opt.config_file, node_id_opt.quiet) + cli::init::node_id_command(opt.config_file, node_id_opt.quiet) } _ => cli_command(opt).await, }; @@ -251,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(READ_KEY_ERROR)?; + .err_context(cli::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -281,11 +280,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); - let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); + let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into()); let cli = cli_v2::Cli { system_rpc_endpoint, - admin_rpc_endpoint, + proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs deleted file mode 100644 index 4699ace5..00000000 --- a/src/garage/repair/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod offline; -pub mod online; diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs deleted file mode 100644 index 2c5227d2..00000000 --- a/src/garage/repair/online.rs +++ /dev/null @@ -1,390 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; -use tokio::sync::watch; - -use garage_block::manager::BlockManager; -use garage_block::repair::ScrubWorkerCommand; - -use garage_model::garage::Garage; -use garage_model::s3::block_ref_table::*; -use garage_model::s3::mpu_table::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; - -use garage_table::replication::*; -use garage_table::*; - -use garage_util::background::*; -use garage_util::data::*; -use garage_util::error::Error; -use garage_util::migrate::Migrate; - -use crate::*; - -const RC_REPAIR_ITER_COUNT: usize = 64; - -pub async fn launch_online_repair( - garage: &Arc<Garage>, - bg: &BackgroundRunner, - opt: RepairOpt, -) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync()?; - garage.object_table.syncer.add_full_sync()?; - garage.version_table.syncer.add_full_sync()?; - garage.block_ref_table.syncer.add_full_sync()?; - garage.key_table.syncer.add_full_sync()?; - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); - } - RepairWhat::MultipartUploads => { - info!("Repairing the multipart uploads table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); - } - RepairWhat::BlockRc => { - info!("Repairing the block reference counters"); - bg.spawn_worker(BlockRcRepair::new( - garage.block_manager.clone(), - garage.block_ref_table.clone(), - )); - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - bg.spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); - } - RepairWhat::Scrub { cmd } => { - let cmd = match cmd { - ScrubCmd::Start => ScrubWorkerCommand::Start, - ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), - ScrubCmd::Resume => ScrubWorkerCommand::Resume, - ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, - ScrubCmd::SetTranquility { tranquility } => { - garage - .block_manager - .scrub_persister - .set_with(|x| x.tranquility = tranquility)?; - return Ok(()); - } - }; - info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await?; - } - RepairWhat::Rebalance => { - info!("Rebalancing the stored blocks among storage locations"); - bg.spawn_worker(garage_block::repair::RebalanceWorker::new( - garage.block_manager.clone(), - )); - } - } - Ok(()) -} - -// ---- - -#[async_trait] -trait TableRepair: Send + Sync + 'static { - type T: TableSchema; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; - - async fn process( - &mut self, - garage: &Garage, - entry: <<Self as TableRepair>::T as TableSchema>::E, - ) -> Result<bool, Error>; -} - -struct TableRepairWorker<T: TableRepair> { - garage: Arc<Garage>, - pos: Vec<u8>, - counter: usize, - repairs: usize, - inner: T, -} - -impl<R: TableRepair> TableRepairWorker<R> { - fn new(garage: Arc<Garage>, inner: R) -> Self { - Self { - garage, - inner, - pos: vec![], - counter: 0, - repairs: 0, - } - } -} - -#[async_trait] -impl<R: TableRepair> Worker for TableRepairWorker<R> { - fn name(&self) -> String { - format!("{} repair worker", R::T::TABLE_NAME) - } - - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(format!("{} ({})", self.counter, self.repairs)), - ..Default::default() - } - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - }; - - let entry = <R::T as TableSchema>::E::decode(&item_bytes) - .ok_or_message("Cannot decode table entry")?; - if self.inner.process(&self.garage, entry).await? { - self.repairs += 1; - } - - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() - } -} - -// ---- - -struct RepairVersions; - -#[async_trait] -impl TableRepair for RepairVersions { - type T = VersionTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.version_table - } - - async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { - if !version.deleted.get() { - let ref_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => garage - .object_table - .get(bucket_id, key) - .await? - .map(|o| { - o.versions().iter().any(|x| { - x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }) - }) - .unwrap_or(false), - VersionBacklink::MultipartUpload { upload_id } => garage - .mpu_table - .get(upload_id, &EmptyKey) - .await? - .map(|u| !u.deleted.get()) - .unwrap_or(false), - }; - - if !ref_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - garage - .version_table - .insert(&Version::new(version.uuid, version.backlink, true)) - .await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ---- - -struct RepairBlockRefs; - -#[async_trait] -impl TableRepair for RepairBlockRefs { - type T = BlockRefTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.block_ref_table - } - - async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> { - if !block_ref.deleted.get() { - let ref_exists = garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await? - .map(|v| !v.deleted.get()) - .unwrap_or(false); - - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - block_ref.deleted.set(); - garage.block_ref_table.insert(&block_ref).await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ---- - -struct RepairMpu; - -#[async_trait] -impl TableRepair for RepairMpu { - type T = MultipartUploadTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.mpu_table - } - - async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> { - if !mpu.deleted.get() { - let ref_exists = garage - .object_table - .get(&mpu.bucket_id, &mpu.key) - .await? - .map(|o| { - o.versions() - .iter() - .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) - }) - .unwrap_or(false); - - if !ref_exists { - info!( - "Repair multipart uploads: marking mpu as deleted: {:?}", - mpu - ); - mpu.parts.clear(); - mpu.deleted.set(); - garage.mpu_table.insert(&mpu).await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ===== block reference counter repair ===== - -pub struct BlockRcRepair { - block_manager: Arc<BlockManager>, - block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, - cursor: Hash, - counter: u64, - repairs: u64, -} - -impl BlockRcRepair { - fn new( - block_manager: Arc<BlockManager>, - block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, - ) -> Self { - Self { - block_manager, - block_ref_table, - cursor: [0u8; 32].into(), - counter: 0, - repairs: 0, - } - } -} - -#[async_trait] -impl Worker for BlockRcRepair { - fn name(&self) -> String { - format!("Block refcount repair worker") - } - - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(format!("{} ({})", self.counter, self.repairs)), - ..Default::default() - } - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - for _i in 0..RC_REPAIR_ITER_COUNT { - let next1 = self - .block_manager - .rc - .rc_table - .range(self.cursor.as_slice()..)? - .next() - .transpose()? - .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); - let next2 = self - .block_ref_table - .data - .store - .range(self.cursor.as_slice()..)? - .next() - .transpose()? - .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); - let next = match (next1, next2) { - (Some(k1), Some(k2)) => std::cmp::min(k1, k2), - (Some(k), None) | (None, Some(k)) => k, - (None, None) => { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - }; - - if self.block_manager.rc.recalculate_rc(&next)?.1 { - self.repairs += 1; - } - self.counter += 1; - if let Some(next_incr) = next.increment() { - self.cursor = next_incr; - } else { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - } - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() - } -} diff --git a/src/garage/server.rs b/src/garage/server.rs index 65bf34db..131cc8aa 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -6,15 +6,14 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::Error; -use garage_api::admin::api_server::AdminApiServer; -use garage_api::s3::api_server::S3ApiServer; +use garage_api_admin::api_server::AdminApiServer; +use garage_api_s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::WebServer; #[cfg(feature = "k2v")] -use garage_api::k2v::api_server::K2VApiServer; +use garage_api_k2v::api_server::K2VApiServer; -use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; @@ -66,6 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new( garage.clone(), + background.clone(), #[cfg(feature = "metrics")] metrics_exporter, ); @@ -73,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); - info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone(), background.clone()); - // ---- Launch public-facing API servers ---- let mut servers = vec![]; diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 42368976..2db72e9f 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -15,7 +15,7 @@ use hyper_util::client::legacy::{connect::HttpConnector, Client}; use hyper_util::rt::TokioExecutor; use super::garage::{Instance, Key}; -use garage_api::signature; +use garage_api_common::signature; pub type Body = FullBody<hyper::body::Bytes>; |