aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex Auvolat <lx@deuxfleurs.fr>2025-02-05 15:36:47 +0100
committerAlex Auvolat <lx@deuxfleurs.fr>2025-02-05 15:36:47 +0100
commitf914db057a85e0fa70f319ee3af85998a551af96 (patch)
treed5a805dbc0615544d3fe1b1b538fbf2a3642a74a /src/garage
parent406b6da1634a38c1b8176ff468d964e42ce5ce5d (diff)
downloadgarage-f914db057a85e0fa70f319ee3af85998a551af96.tar.gz
garage-f914db057a85e0fa70f319ee3af85998a551af96.zip
cli_v2: implement LaunchRepairOperation and remove old stuff
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml2
-rw-r--r--src/garage/admin/mod.rs108
-rw-r--r--src/garage/cli/cmd.rs21
-rw-r--r--src/garage/cli/layout.rs2
-rw-r--r--src/garage/cli/mod.rs9
-rw-r--r--src/garage/cli/repair.rs (renamed from src/garage/repair/offline.rs)0
-rw-r--r--src/garage/cli/structs.rs64
-rw-r--r--src/garage/cli_v2/mod.rs14
-rw-r--r--src/garage/cli_v2/node.rs48
-rw-r--r--src/garage/main.rs13
-rw-r--r--src/garage/repair/mod.rs2
-rw-r--r--src/garage/repair/online.rs390
-rw-r--r--src/garage/server.rs4
13 files changed, 82 insertions, 595 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 4f823fc6..c566c3e0 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -49,8 +49,6 @@ sodiumoxide.workspace = true
structopt.workspace = true
git-version.workspace = true
-serde.workspace = true
-
futures.workspace = true
tokio.workspace = true
diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs
deleted file mode 100644
index c4ab2810..00000000
--- a/src/garage/admin/mod.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use serde::{Deserialize, Serialize};
-
-use garage_util::background::BackgroundRunner;
-use garage_util::error::Error as GarageError;
-
-use garage_rpc::*;
-
-use garage_model::garage::Garage;
-use garage_model::helper::error::Error;
-
-use crate::cli::*;
-use crate::repair::online::launch_online_repair;
-
-pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
-
-#[derive(Debug, Serialize, Deserialize)]
-#[allow(clippy::large_enum_variant)]
-pub enum AdminRpc {
- LaunchRepair(RepairOpt),
-
- // Replies
- Ok(String),
-}
-
-impl Rpc for AdminRpc {
- type Response = Result<AdminRpc, Error>;
-}
-
-pub struct AdminRpcHandler {
- garage: Arc<Garage>,
- background: Arc<BackgroundRunner>,
- endpoint: Arc<Endpoint<AdminRpc, Self>>,
-}
-
-impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
- let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self {
- garage,
- background,
- endpoint,
- });
- admin.endpoint.set_handler(admin.clone());
- admin
- }
-
- // ================ REPAIR COMMANDS ====================
-
- async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
- if !opt.yes {
- return Err(Error::BadRequest(
- "Please provide the --yes flag to initiate repair operations.".to_string(),
- ));
- }
- if opt.all_nodes {
- let mut opt_to_send = opt.clone();
- opt_to_send.all_nodes = false;
-
- let mut failures = vec![];
- let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
- for node in all_nodes.iter() {
- let node = (*node).into();
- let resp = self
- .endpoint
- .call(
- &node,
- AdminRpc::LaunchRepair(opt_to_send.clone()),
- PRIO_NORMAL,
- )
- .await;
- if !matches!(resp, Ok(Ok(_))) {
- failures.push(node);
- }
- }
- if failures.is_empty() {
- Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
- } else {
- Err(Error::BadRequest(format!(
- "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
- failures
- )))
- }
- } else {
- launch_online_repair(&self.garage, &self.background, opt).await?;
- Ok(AdminRpc::Ok(format!(
- "Repair launched on {:?}",
- self.garage.system.id
- )))
- }
- }
-}
-
-#[async_trait]
-impl EndpointHandler<AdminRpc> for AdminRpcHandler {
- async fn handle(
- self: &Arc<Self>,
- message: &AdminRpc,
- _from: NodeID,
- ) -> Result<AdminRpc, Error> {
- match message {
- AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
- m => Err(GarageError::unexpected_rpc_message(m).into()),
- }
- }
-}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
deleted file mode 100644
index 1a9c7841..00000000
--- a/src/garage/cli/cmd.rs
+++ /dev/null
@@ -1,21 +0,0 @@
-use garage_rpc::*;
-
-use garage_model::helper::error::Error as HelperError;
-
-use crate::admin::*;
-
-pub async fn cmd_admin(
- rpc_cli: &Endpoint<AdminRpc, ()>,
- rpc_host: NodeID,
- args: AdminRpc,
-) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
- AdminRpc::Ok(msg) => {
- println!("{}", msg);
- }
- r => {
- error!("Unexpected response: {:?}", r);
- }
- }
- Ok(())
-}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 15040aaa..bb77cc2a 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -7,7 +7,7 @@ use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
-use crate::cli::*;
+use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs
index c15afda1..e007808b 100644
--- a/src/garage/cli/mod.rs
+++ b/src/garage/cli/mod.rs
@@ -1,10 +1,7 @@
-pub(crate) mod cmd;
-pub(crate) mod init;
-pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod convert_db;
+pub(crate) mod init;
+pub(crate) mod repair;
-pub(crate) use cmd::*;
-pub(crate) use init::*;
-pub(crate) use structs::*;
+pub(crate) mod layout;
diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs
index 45024e71..45024e71 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/cli/repair.rs
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 4ec35e68..c6471515 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::version::garage_version;
@@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt {
pub(crate) allow_missing_data: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list", version = garage_version())]
@@ -237,7 +236,7 @@ pub enum BucketOperation {
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@@ -259,13 +258,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@@ -275,7 +274,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@@ -288,7 +287,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@@ -321,7 +320,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
@@ -336,7 +335,7 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
@@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt {
pub buckets: Vec<String>,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
@@ -382,7 +381,7 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -391,14 +390,14 @@ pub struct KeyInfoOpt {
pub show_secret: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(default_value = "Unnamed key")]
pub name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -407,7 +406,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -417,7 +416,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@@ -427,7 +426,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug)]
+#[derive(StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
@@ -444,7 +443,7 @@ pub struct KeyImportOpt {
pub yes: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
@@ -458,7 +457,7 @@ pub struct RepairOpt {
pub what: RepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
@@ -489,7 +488,7 @@ pub enum RepairWhat {
Rebalance,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start", version = garage_version())]
@@ -503,15 +502,9 @@ pub enum ScrubCmd {
/// Cancel scrub in progress
#[structopt(name = "cancel", version = garage_version())]
Cancel,
- /// Set tranquility level for in-progress and future scrubs
- #[structopt(name = "set-tranquility", version = garage_version())]
- SetTranquility {
- #[structopt()]
- tranquility: u32,
- },
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
@@ -521,7 +514,7 @@ pub struct OfflineRepairOpt {
pub what: OfflineRepairWhat,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
@@ -532,19 +525,14 @@ pub enum OfflineRepairWhat {
ObjectCounters,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
+#[derive(StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
-
- /// Don't show global cluster stats (internal use in RPC)
- #[structopt(skip)]
- #[serde(default)]
- pub skip_global: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
@@ -577,7 +565,7 @@ pub enum WorkerOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
@@ -587,7 +575,7 @@ pub struct WorkerListOpt {
pub errors: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
@@ -619,7 +607,7 @@ pub enum BlockOperation {
},
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]
diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs
index dccdc295..28c7c824 100644
--- a/src/garage/cli_v2/mod.rs
+++ b/src/garage/cli_v2/mod.rs
@@ -20,14 +20,10 @@ use garage_api_admin::api::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
use garage_api_admin::RequestHandler;
-use crate::admin::*;
-use crate::cli as cli_v1;
use crate::cli::structs::*;
-use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
- pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@@ -46,15 +42,7 @@ impl Cli {
Command::Block(bo) => self.cmd_block(bo).await,
Command::Meta(mo) => self.cmd_meta(mo).await,
Command::Stats(so) => self.cmd_stats(so).await,
-
- // TODO
- Command::Repair(ro) => cli_v1::cmd_admin(
- &self.admin_rpc_endpoint,
- self.rpc_host,
- AdminRpc::LaunchRepair(ro),
- )
- .await
- .ok_or_message("cli_v1"),
+ Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}
diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs
index b1915dc4..c5d0cdea 100644
--- a/src/garage/cli_v2/node.rs
+++ b/src/garage/cli_v2/node.rs
@@ -27,7 +27,7 @@ impl Cli {
table.push(format!("{:.16}\tError: {}", node, err));
}
for (node, _) in res.success.iter() {
- table.push(format!("{:.16}\tOk", node));
+ table.push(format!("{:.16}\tSnapshot created", node));
}
format_table(table);
@@ -64,4 +64,50 @@ impl Cli {
Ok(())
}
+
+ pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
+ if !cmd.yes {
+ return Err(Error::Message(
+ "Please add --yes to start the repair operation".into(),
+ ));
+ }
+
+ let repair_type = match cmd.what {
+ RepairWhat::Tables => RepairType::Tables,
+ RepairWhat::Blocks => RepairType::Blocks,
+ RepairWhat::Versions => RepairType::Versions,
+ RepairWhat::MultipartUploads => RepairType::MultipartUploads,
+ RepairWhat::BlockRefs => RepairType::BlockRefs,
+ RepairWhat::BlockRc => RepairType::BlockRc,
+ RepairWhat::Rebalance => RepairType::Rebalance,
+ RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
+ ScrubCmd::Start => ScrubCommand::Start,
+ ScrubCmd::Cancel => ScrubCommand::Cancel,
+ ScrubCmd::Pause => ScrubCommand::Pause,
+ ScrubCmd::Resume => ScrubCommand::Resume,
+ }),
+ };
+
+ let res = self
+ .api_request(LaunchRepairOperationRequest {
+ node: if cmd.all_nodes {
+ "*".to_string()
+ } else {
+ hex::encode(self.rpc_host)
+ },
+ body: LocalLaunchRepairOperationRequest { repair_type },
+ })
+ .await?;
+
+ let mut table = vec![];
+ for (node, err) in res.error.iter() {
+ table.push(format!("{:.16}\tError: {}", node, err));
+ }
+ for (node, _) in res.success.iter() {
+ table.push(format!("{:.16}\tRepair launched", node));
+ }
+ format_table(table);
+
+ Ok(())
+ }
}
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 022841f5..2a88d760 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -4,10 +4,8 @@
#[macro_use]
extern crate tracing;
-mod admin;
mod cli;
mod cli_v2;
-mod repair;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@@ -37,8 +35,7 @@ use garage_rpc::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
-use admin::*;
-use cli::*;
+use cli::structs::*;
use secrets::Secrets;
#[derive(StructOpt, Debug)]
@@ -146,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
- repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
+ cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
- node_id_command(opt.config_file, node_id_opt.quiet)
+ cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
_ => cli_command(opt).await,
};
@@ -253,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
- .err_context(READ_KEY_ERROR)?;
+ .err_context(cli::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@@ -283,12 +280,10 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
- let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
- admin_rpc_endpoint,
proxy_rpc_endpoint,
rpc_host: id,
};
diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs
deleted file mode 100644
index 4699ace5..00000000
--- a/src/garage/repair/mod.rs
+++ /dev/null
@@ -1,2 +0,0 @@
-pub mod offline;
-pub mod online;
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
deleted file mode 100644
index 2c5227d2..00000000
--- a/src/garage/repair/online.rs
+++ /dev/null
@@ -1,390 +0,0 @@
-use std::sync::Arc;
-use std::time::Duration;
-
-use async_trait::async_trait;
-use tokio::sync::watch;
-
-use garage_block::manager::BlockManager;
-use garage_block::repair::ScrubWorkerCommand;
-
-use garage_model::garage::Garage;
-use garage_model::s3::block_ref_table::*;
-use garage_model::s3::mpu_table::*;
-use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::*;
-
-use garage_table::replication::*;
-use garage_table::*;
-
-use garage_util::background::*;
-use garage_util::data::*;
-use garage_util::error::Error;
-use garage_util::migrate::Migrate;
-
-use crate::*;
-
-const RC_REPAIR_ITER_COUNT: usize = 64;
-
-pub async fn launch_online_repair(
- garage: &Arc<Garage>,
- bg: &BackgroundRunner,
- opt: RepairOpt,
-) -> Result<(), Error> {
- match opt.what {
- RepairWhat::Tables => {
- info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync()?;
- garage.object_table.syncer.add_full_sync()?;
- garage.version_table.syncer.add_full_sync()?;
- garage.block_ref_table.syncer.add_full_sync()?;
- garage.key_table.syncer.add_full_sync()?;
- }
- RepairWhat::Versions => {
- info!("Repairing the versions table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
- }
- RepairWhat::MultipartUploads => {
- info!("Repairing the multipart uploads table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
- }
- RepairWhat::BlockRefs => {
- info!("Repairing the block refs table");
- bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
- }
- RepairWhat::BlockRc => {
- info!("Repairing the block reference counters");
- bg.spawn_worker(BlockRcRepair::new(
- garage.block_manager.clone(),
- garage.block_ref_table.clone(),
- ));
- }
- RepairWhat::Blocks => {
- info!("Repairing the stored blocks");
- bg.spawn_worker(garage_block::repair::RepairWorker::new(
- garage.block_manager.clone(),
- ));
- }
- RepairWhat::Scrub { cmd } => {
- let cmd = match cmd {
- ScrubCmd::Start => ScrubWorkerCommand::Start,
- ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
- ScrubCmd::Resume => ScrubWorkerCommand::Resume,
- ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
- ScrubCmd::SetTranquility { tranquility } => {
- garage
- .block_manager
- .scrub_persister
- .set_with(|x| x.tranquility = tranquility)?;
- return Ok(());
- }
- };
- info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await?;
- }
- RepairWhat::Rebalance => {
- info!("Rebalancing the stored blocks among storage locations");
- bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
- garage.block_manager.clone(),
- ));
- }
- }
- Ok(())
-}
-
-// ----
-
-#[async_trait]
-trait TableRepair: Send + Sync + 'static {
- type T: TableSchema;
-
- fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
-
- async fn process(
- &mut self,
- garage: &Garage,
- entry: <<Self as TableRepair>::T as TableSchema>::E,
- ) -> Result<bool, Error>;
-}
-
-struct TableRepairWorker<T: TableRepair> {
- garage: Arc<Garage>,
- pos: Vec<u8>,
- counter: usize,
- repairs: usize,
- inner: T,
-}
-
-impl<R: TableRepair> TableRepairWorker<R> {
- fn new(garage: Arc<Garage>, inner: R) -> Self {
- Self {
- garage,
- inner,
- pos: vec![],
- counter: 0,
- repairs: 0,
- }
- }
-}
-
-#[async_trait]
-impl<R: TableRepair> Worker for TableRepairWorker<R> {
- fn name(&self) -> String {
- format!("{} repair worker", R::T::TABLE_NAME)
- }
-
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- progress: Some(format!("{} ({})", self.counter, self.repairs)),
- ..Default::default()
- }
- }
-
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
- Some((k, v)) => (v, k),
- None => {
- info!(
- "{}: finished, done {}, fixed {}",
- self.name(),
- self.counter,
- self.repairs
- );
- return Ok(WorkerState::Done);
- }
- };
-
- let entry = <R::T as TableSchema>::E::decode(&item_bytes)
- .ok_or_message("Cannot decode table entry")?;
- if self.inner.process(&self.garage, entry).await? {
- self.repairs += 1;
- }
-
- self.counter += 1;
- self.pos = next_pos;
-
- Ok(WorkerState::Busy)
- }
-
- async fn wait_for_work(&mut self) -> WorkerState {
- unreachable!()
- }
-}
-
-// ----
-
-struct RepairVersions;
-
-#[async_trait]
-impl TableRepair for RepairVersions {
- type T = VersionTable;
-
- fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
- &garage.version_table
- }
-
- async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
- if !version.deleted.get() {
- let ref_exists = match &version.backlink {
- VersionBacklink::Object { bucket_id, key } => garage
- .object_table
- .get(bucket_id, key)
- .await?
- .map(|o| {
- o.versions().iter().any(|x| {
- x.uuid == version.uuid && x.state != ObjectVersionState::Aborted
- })
- })
- .unwrap_or(false),
- VersionBacklink::MultipartUpload { upload_id } => garage
- .mpu_table
- .get(upload_id, &EmptyKey)
- .await?
- .map(|u| !u.deleted.get())
- .unwrap_or(false),
- };
-
- if !ref_exists {
- info!("Repair versions: marking version as deleted: {:?}", version);
- garage
- .version_table
- .insert(&Version::new(version.uuid, version.backlink, true))
- .await?;
- return Ok(true);
- }
- }
-
- Ok(false)
- }
-}
-
-// ----
-
-struct RepairBlockRefs;
-
-#[async_trait]
-impl TableRepair for RepairBlockRefs {
- type T = BlockRefTable;
-
- fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
- &garage.block_ref_table
- }
-
- async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
- if !block_ref.deleted.get() {
- let ref_exists = garage
- .version_table
- .get(&block_ref.version, &EmptyKey)
- .await?
- .map(|v| !v.deleted.get())
- .unwrap_or(false);
-
- if !ref_exists {
- info!(
- "Repair block ref: marking block_ref as deleted: {:?}",
- block_ref
- );
- block_ref.deleted.set();
- garage.block_ref_table.insert(&block_ref).await?;
- return Ok(true);
- }
- }
-
- Ok(false)
- }
-}
-
-// ----
-
-struct RepairMpu;
-
-#[async_trait]
-impl TableRepair for RepairMpu {
- type T = MultipartUploadTable;
-
- fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> {
- &garage.mpu_table
- }
-
- async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
- if !mpu.deleted.get() {
- let ref_exists = garage
- .object_table
- .get(&mpu.bucket_id, &mpu.key)
- .await?
- .map(|o| {
- o.versions()
- .iter()
- .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true)))
- })
- .unwrap_or(false);
-
- if !ref_exists {
- info!(
- "Repair multipart uploads: marking mpu as deleted: {:?}",
- mpu
- );
- mpu.parts.clear();
- mpu.deleted.set();
- garage.mpu_table.insert(&mpu).await?;
- return Ok(true);
- }
- }
-
- Ok(false)
- }
-}
-
-// ===== block reference counter repair =====
-
-pub struct BlockRcRepair {
- block_manager: Arc<BlockManager>,
- block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
- cursor: Hash,
- counter: u64,
- repairs: u64,
-}
-
-impl BlockRcRepair {
- fn new(
- block_manager: Arc<BlockManager>,
- block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
- ) -> Self {
- Self {
- block_manager,
- block_ref_table,
- cursor: [0u8; 32].into(),
- counter: 0,
- repairs: 0,
- }
- }
-}
-
-#[async_trait]
-impl Worker for BlockRcRepair {
- fn name(&self) -> String {
- format!("Block refcount repair worker")
- }
-
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- progress: Some(format!("{} ({})", self.counter, self.repairs)),
- ..Default::default()
- }
- }
-
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- for _i in 0..RC_REPAIR_ITER_COUNT {
- let next1 = self
- .block_manager
- .rc
- .rc_table
- .range(self.cursor.as_slice()..)?
- .next()
- .transpose()?
- .map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
- let next2 = self
- .block_ref_table
- .data
- .store
- .range(self.cursor.as_slice()..)?
- .next()
- .transpose()?
- .map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
- let next = match (next1, next2) {
- (Some(k1), Some(k2)) => std::cmp::min(k1, k2),
- (Some(k), None) | (None, Some(k)) => k,
- (None, None) => {
- info!(
- "{}: finished, done {}, fixed {}",
- self.name(),
- self.counter,
- self.repairs
- );
- return Ok(WorkerState::Done);
- }
- };
-
- if self.block_manager.rc.recalculate_rc(&next)?.1 {
- self.repairs += 1;
- }
- self.counter += 1;
- if let Some(next_incr) = next.increment() {
- self.cursor = next_incr;
- } else {
- info!(
- "{}: finished, done {}, fixed {}",
- self.name(),
- self.counter,
- self.repairs
- );
- return Ok(WorkerState::Done);
- }
- }
-
- Ok(WorkerState::Busy)
- }
-
- async fn wait_for_work(&mut self) -> WorkerState {
- unreachable!()
- }
-}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index e629041c..131cc8aa 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -14,7 +14,6 @@ use garage_web::WebServer;
#[cfg(feature = "k2v")]
use garage_api_k2v::api_server::K2VApiServer;
-use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
@@ -74,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
- info!("Create admin RPC handler...");
- AdminRpcHandler::new(garage.clone(), background.clone());
-
// ---- Launch public-facing API servers ----
let mut servers = vec![];