diff options
author | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-05 15:36:47 +0100 |
---|---|---|
committer | Alex Auvolat <lx@deuxfleurs.fr> | 2025-02-05 15:36:47 +0100 |
commit | f914db057a85e0fa70f319ee3af85998a551af96 (patch) | |
tree | d5a805dbc0615544d3fe1b1b538fbf2a3642a74a /src/garage | |
parent | 406b6da1634a38c1b8176ff468d964e42ce5ce5d (diff) | |
download | garage-f914db057a85e0fa70f319ee3af85998a551af96.tar.gz garage-f914db057a85e0fa70f319ee3af85998a551af96.zip |
cli_v2: implement LaunchRepairOperation and remove old stuff
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 2 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 108 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 21 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 2 | ||||
-rw-r--r-- | src/garage/cli/mod.rs | 9 | ||||
-rw-r--r-- | src/garage/cli/repair.rs (renamed from src/garage/repair/offline.rs) | 0 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 64 | ||||
-rw-r--r-- | src/garage/cli_v2/mod.rs | 14 | ||||
-rw-r--r-- | src/garage/cli_v2/node.rs | 48 | ||||
-rw-r--r-- | src/garage/main.rs | 13 | ||||
-rw-r--r-- | src/garage/repair/mod.rs | 2 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 390 | ||||
-rw-r--r-- | src/garage/server.rs | 4 |
13 files changed, 82 insertions, 595 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 4f823fc6..c566c3e0 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -49,8 +49,6 @@ sodiumoxide.workspace = true structopt.workspace = true git-version.workspace = true -serde.workspace = true - futures.workspace = true tokio.workspace = true diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index c4ab2810..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use garage_util::background::BackgroundRunner; -use garage_util::error::Error as GarageError; - -use garage_rpc::*; - -use garage_model::garage::Garage; -use garage_model::helper::error::Error; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - LaunchRepair(RepairOpt), - - // Replies - Ok(String), -} - -impl Rpc for AdminRpc { - type Response = Result<AdminRpc, Error>; -} - -pub struct AdminRpcHandler { - garage: Arc<Garage>, - background: Arc<BackgroundRunner>, - endpoint: Arc<Endpoint<AdminRpc, Self>>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } -} - -#[async_trait] -impl EndpointHandler<AdminRpc> for AdminRpcHandler { - async fn handle( - self: &Arc<Self>, - message: &AdminRpc, - _from: NodeID, - ) -> Result<AdminRpc, Error> { - match message { - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } -} diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs deleted file mode 100644 index 1a9c7841..00000000 --- a/src/garage/cli/cmd.rs +++ /dev/null @@ -1,21 +0,0 @@ -use garage_rpc::*; - -use garage_model::helper::error::Error as HelperError; - -use crate::admin::*; - -pub async fn cmd_admin( - rpc_cli: &Endpoint<AdminRpc, ()>, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 15040aaa..bb77cc2a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -7,7 +7,7 @@ use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; -use crate::cli::*; +use crate::cli::structs::*; pub async fn cmd_show_layout( rpc_cli: &Endpoint<SystemRpc, ()>, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index c15afda1..e007808b 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,10 +1,7 @@ -pub(crate) mod cmd; -pub(crate) mod init; -pub(crate) mod layout; pub(crate) mod structs; pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; -pub(crate) use cmd::*; -pub(crate) use init::*; -pub(crate) use structs::*; +pub(crate) mod layout; diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs index 45024e71..45024e71 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/cli/repair.rs diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4ec35e68..c6471515 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use structopt::StructOpt; use garage_util::version::garage_version; @@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt { pub(crate) allow_missing_data: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum BucketOperation { /// List buckets #[structopt(name = "list", version = garage_version())] @@ -237,7 +236,7 @@ pub enum BucketOperation { CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct WebsiteOpt { /// Create #[structopt(long = "allow")] @@ -259,13 +258,13 @@ pub struct WebsiteOpt { pub error_document: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct BucketOpt { /// Bucket name pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct DeleteBucketOpt { /// Bucket name pub name: String, @@ -275,7 +274,7 @@ pub struct DeleteBucketOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct AliasBucketOpt { /// Existing bucket name (its alias in global namespace or its full hex uuid) pub existing_bucket: String, @@ -288,7 +287,7 @@ pub struct AliasBucketOpt { pub local: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct UnaliasBucketOpt { /// Bucket name pub name: String, @@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt { pub local: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct PermBucketOpt { /// Access key name or ID #[structopt(long = "key")] @@ -321,7 +320,7 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct SetQuotasOpt { /// Bucket name pub bucket: String, @@ -336,7 +335,7 @@ pub struct SetQuotasOpt { pub max_objects: Option<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct CleanupIncompleteUploadsOpt { /// Abort multipart uploads older than this value #[structopt(long = "older-than", default_value = "1d")] @@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt { pub buckets: Vec<String>, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] @@ -382,7 +381,7 @@ pub enum KeyOperation { Import(KeyImportOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyInfoOpt { /// ID or name of the key pub key_pattern: String, @@ -391,14 +390,14 @@ pub struct KeyInfoOpt { pub show_secret: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyNewOpt { /// Name of the key #[structopt(default_value = "Unnamed key")] pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyRenameOpt { /// ID or name of the key pub key_pattern: String, @@ -407,7 +406,7 @@ pub struct KeyRenameOpt { pub new_name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyDeleteOpt { /// ID or name of the key pub key_pattern: String, @@ -417,7 +416,7 @@ pub struct KeyDeleteOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyPermOpt { /// ID or name of the key pub key_pattern: String, @@ -427,7 +426,7 @@ pub struct KeyPermOpt { pub create_bucket: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyImportOpt { /// Access key ID pub key_id: String, @@ -444,7 +443,7 @@ pub struct KeyImportOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes #[structopt(short = "a", long = "all-nodes")] @@ -458,7 +457,7 @@ pub struct RepairOpt { pub what: RepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] @@ -489,7 +488,7 @@ pub enum RepairWhat { Rebalance, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum ScrubCmd { /// Start scrub #[structopt(name = "start", version = garage_version())] @@ -503,15 +502,9 @@ pub enum ScrubCmd { /// Cancel scrub in progress #[structopt(name = "cancel", version = garage_version())] Cancel, - /// Set tranquility level for in-progress and future scrubs - #[structopt(name = "set-tranquility", version = garage_version())] - SetTranquility { - #[structopt()] - tranquility: u32, - }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct OfflineRepairOpt { /// Confirm the launch of the repair operation #[structopt(long = "yes")] @@ -521,7 +514,7 @@ pub struct OfflineRepairOpt { pub what: OfflineRepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum OfflineRepairWhat { /// Repair K2V item counters #[cfg(feature = "k2v")] @@ -532,19 +525,14 @@ pub enum OfflineRepairWhat { ObjectCounters, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] pub all_nodes: bool, - - /// Don't show global cluster stats (internal use in RPC) - #[structopt(skip)] - #[serde(default)] - pub skip_global: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] @@ -577,7 +565,7 @@ pub enum WorkerOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub struct WorkerListOpt { /// Show only busy workers #[structopt(short = "b", long = "busy")] @@ -587,7 +575,7 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] @@ -619,7 +607,7 @@ pub enum BlockOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub enum MetaOperation { /// Save a snapshot of the metadata db file #[structopt(name = "snapshot", version = garage_version())] diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index dccdc295..28c7c824 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -20,14 +20,10 @@ use garage_api_admin::api::*; use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; use garage_api_admin::RequestHandler; -use crate::admin::*; -use crate::cli as cli_v1; use crate::cli::structs::*; -use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>, - pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>, pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>, pub rpc_host: NodeID, } @@ -46,15 +42,7 @@ impl Cli { Command::Block(bo) => self.cmd_block(bo).await, Command::Meta(mo) => self.cmd_meta(mo).await, Command::Stats(so) => self.cmd_stats(so).await, - - // TODO - Command::Repair(ro) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::LaunchRepair(ro), - ) - .await - .ok_or_message("cli_v1"), + Command::Repair(ro) => self.cmd_repair(ro).await, _ => unreachable!(), } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs index b1915dc4..c5d0cdea 100644 --- a/src/garage/cli_v2/node.rs +++ b/src/garage/cli_v2/node.rs @@ -27,7 +27,7 @@ impl Cli { table.push(format!("{:.16}\tError: {}", node, err)); } for (node, _) in res.success.iter() { - table.push(format!("{:.16}\tOk", node)); + table.push(format!("{:.16}\tSnapshot created", node)); } format_table(table); @@ -64,4 +64,50 @@ impl Cli { Ok(()) } + + pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> { + if !cmd.yes { + return Err(Error::Message( + "Please add --yes to start the repair operation".into(), + )); + } + + let repair_type = match cmd.what { + RepairWhat::Tables => RepairType::Tables, + RepairWhat::Blocks => RepairType::Blocks, + RepairWhat::Versions => RepairType::Versions, + RepairWhat::MultipartUploads => RepairType::MultipartUploads, + RepairWhat::BlockRefs => RepairType::BlockRefs, + RepairWhat::BlockRc => RepairType::BlockRc, + RepairWhat::Rebalance => RepairType::Rebalance, + RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd { + ScrubCmd::Start => ScrubCommand::Start, + ScrubCmd::Cancel => ScrubCommand::Cancel, + ScrubCmd::Pause => ScrubCommand::Pause, + ScrubCmd::Resume => ScrubCommand::Resume, + }), + }; + + let res = self + .api_request(LaunchRepairOperationRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalLaunchRepairOperationRequest { repair_type }, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tRepair launched", node)); + } + format_table(table); + + Ok(()) + } } diff --git a/src/garage/main.rs b/src/garage/main.rs index 022841f5..2a88d760 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,10 +4,8 @@ #[macro_use] extern crate tracing; -mod admin; mod cli; mod cli_v2; -mod repair; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -37,8 +35,7 @@ use garage_rpc::*; use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH}; -use admin::*; -use cli::*; +use cli::structs::*; use secrets::Secrets; #[derive(StructOpt, Debug)] @@ -146,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { cli::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - node_id_command(opt.config_file, node_id_opt.quiet) + cli::init::node_id_command(opt.config_file, node_id_opt.quiet) } _ => cli_command(opt).await, }; @@ -253,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(READ_KEY_ERROR)?; + .err_context(cli::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -283,12 +280,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into()); - let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into()); let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into()); let cli = cli_v2::Cli { system_rpc_endpoint, - admin_rpc_endpoint, proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs deleted file mode 100644 index 4699ace5..00000000 --- a/src/garage/repair/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod offline; -pub mod online; diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs deleted file mode 100644 index 2c5227d2..00000000 --- a/src/garage/repair/online.rs +++ /dev/null @@ -1,390 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; -use tokio::sync::watch; - -use garage_block::manager::BlockManager; -use garage_block::repair::ScrubWorkerCommand; - -use garage_model::garage::Garage; -use garage_model::s3::block_ref_table::*; -use garage_model::s3::mpu_table::*; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; - -use garage_table::replication::*; -use garage_table::*; - -use garage_util::background::*; -use garage_util::data::*; -use garage_util::error::Error; -use garage_util::migrate::Migrate; - -use crate::*; - -const RC_REPAIR_ITER_COUNT: usize = 64; - -pub async fn launch_online_repair( - garage: &Arc<Garage>, - bg: &BackgroundRunner, - opt: RepairOpt, -) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync()?; - garage.object_table.syncer.add_full_sync()?; - garage.version_table.syncer.add_full_sync()?; - garage.block_ref_table.syncer.add_full_sync()?; - garage.key_table.syncer.add_full_sync()?; - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); - } - RepairWhat::MultipartUploads => { - info!("Repairing the multipart uploads table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); - } - RepairWhat::BlockRc => { - info!("Repairing the block reference counters"); - bg.spawn_worker(BlockRcRepair::new( - garage.block_manager.clone(), - garage.block_ref_table.clone(), - )); - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - bg.spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); - } - RepairWhat::Scrub { cmd } => { - let cmd = match cmd { - ScrubCmd::Start => ScrubWorkerCommand::Start, - ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), - ScrubCmd::Resume => ScrubWorkerCommand::Resume, - ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, - ScrubCmd::SetTranquility { tranquility } => { - garage - .block_manager - .scrub_persister - .set_with(|x| x.tranquility = tranquility)?; - return Ok(()); - } - }; - info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await?; - } - RepairWhat::Rebalance => { - info!("Rebalancing the stored blocks among storage locations"); - bg.spawn_worker(garage_block::repair::RebalanceWorker::new( - garage.block_manager.clone(), - )); - } - } - Ok(()) -} - -// ---- - -#[async_trait] -trait TableRepair: Send + Sync + 'static { - type T: TableSchema; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; - - async fn process( - &mut self, - garage: &Garage, - entry: <<Self as TableRepair>::T as TableSchema>::E, - ) -> Result<bool, Error>; -} - -struct TableRepairWorker<T: TableRepair> { - garage: Arc<Garage>, - pos: Vec<u8>, - counter: usize, - repairs: usize, - inner: T, -} - -impl<R: TableRepair> TableRepairWorker<R> { - fn new(garage: Arc<Garage>, inner: R) -> Self { - Self { - garage, - inner, - pos: vec![], - counter: 0, - repairs: 0, - } - } -} - -#[async_trait] -impl<R: TableRepair> Worker for TableRepairWorker<R> { - fn name(&self) -> String { - format!("{} repair worker", R::T::TABLE_NAME) - } - - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(format!("{} ({})", self.counter, self.repairs)), - ..Default::default() - } - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - }; - - let entry = <R::T as TableSchema>::E::decode(&item_bytes) - .ok_or_message("Cannot decode table entry")?; - if self.inner.process(&self.garage, entry).await? { - self.repairs += 1; - } - - self.counter += 1; - self.pos = next_pos; - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() - } -} - -// ---- - -struct RepairVersions; - -#[async_trait] -impl TableRepair for RepairVersions { - type T = VersionTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.version_table - } - - async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { - if !version.deleted.get() { - let ref_exists = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => garage - .object_table - .get(bucket_id, key) - .await? - .map(|o| { - o.versions().iter().any(|x| { - x.uuid == version.uuid && x.state != ObjectVersionState::Aborted - }) - }) - .unwrap_or(false), - VersionBacklink::MultipartUpload { upload_id } => garage - .mpu_table - .get(upload_id, &EmptyKey) - .await? - .map(|u| !u.deleted.get()) - .unwrap_or(false), - }; - - if !ref_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - garage - .version_table - .insert(&Version::new(version.uuid, version.backlink, true)) - .await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ---- - -struct RepairBlockRefs; - -#[async_trait] -impl TableRepair for RepairBlockRefs { - type T = BlockRefTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.block_ref_table - } - - async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> { - if !block_ref.deleted.get() { - let ref_exists = garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await? - .map(|v| !v.deleted.get()) - .unwrap_or(false); - - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - block_ref.deleted.set(); - garage.block_ref_table.insert(&block_ref).await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ---- - -struct RepairMpu; - -#[async_trait] -impl TableRepair for RepairMpu { - type T = MultipartUploadTable; - - fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { - &garage.mpu_table - } - - async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> { - if !mpu.deleted.get() { - let ref_exists = garage - .object_table - .get(&mpu.bucket_id, &mpu.key) - .await? - .map(|o| { - o.versions() - .iter() - .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) - }) - .unwrap_or(false); - - if !ref_exists { - info!( - "Repair multipart uploads: marking mpu as deleted: {:?}", - mpu - ); - mpu.parts.clear(); - mpu.deleted.set(); - garage.mpu_table.insert(&mpu).await?; - return Ok(true); - } - } - - Ok(false) - } -} - -// ===== block reference counter repair ===== - -pub struct BlockRcRepair { - block_manager: Arc<BlockManager>, - block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, - cursor: Hash, - counter: u64, - repairs: u64, -} - -impl BlockRcRepair { - fn new( - block_manager: Arc<BlockManager>, - block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, - ) -> Self { - Self { - block_manager, - block_ref_table, - cursor: [0u8; 32].into(), - counter: 0, - repairs: 0, - } - } -} - -#[async_trait] -impl Worker for BlockRcRepair { - fn name(&self) -> String { - format!("Block refcount repair worker") - } - - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(format!("{} ({})", self.counter, self.repairs)), - ..Default::default() - } - } - - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - for _i in 0..RC_REPAIR_ITER_COUNT { - let next1 = self - .block_manager - .rc - .rc_table - .range(self.cursor.as_slice()..)? - .next() - .transpose()? - .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap()); - let next2 = self - .block_ref_table - .data - .store - .range(self.cursor.as_slice()..)? - .next() - .transpose()? - .map(|(k, _)| Hash::try_from(&k[..32]).unwrap()); - let next = match (next1, next2) { - (Some(k1), Some(k2)) => std::cmp::min(k1, k2), - (Some(k), None) | (None, Some(k)) => k, - (None, None) => { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - }; - - if self.block_manager.rc.recalculate_rc(&next)?.1 { - self.repairs += 1; - } - self.counter += 1; - if let Some(next_incr) = next.increment() { - self.cursor = next_incr; - } else { - info!( - "{}: finished, done {}, fixed {}", - self.name(), - self.counter, - self.repairs - ); - return Ok(WorkerState::Done); - } - } - - Ok(WorkerState::Busy) - } - - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() - } -} diff --git a/src/garage/server.rs b/src/garage/server.rs index e629041c..131cc8aa 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -14,7 +14,6 @@ use garage_web::WebServer; #[cfg(feature = "k2v")] use garage_api_k2v::api_server::K2VApiServer; -use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; @@ -74,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); - info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone(), background.clone()); - // ---- Launch public-facing API servers ---- let mut servers = vec![]; |