aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/admin.rs')
-rw-r--r--src/garage/admin.rs306
1 files changed, 253 insertions, 53 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index e973cfe7..1ca3698a 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::formater::format_table_to_string;
use garage_util::time::*;
use garage_table::replication::*;
@@ -15,6 +16,7 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
@@ -24,6 +26,8 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@@ -38,7 +42,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
// Replies
Ok(String),
@@ -54,6 +59,13 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
+ WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
}
impl Rpc for AdminRpc {
@@ -73,6 +85,8 @@ impl AdminRpcHandler {
admin
}
+ // ================ BUCKET COMMANDS ====================
+
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
@@ -551,6 +565,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret))
}
+ // ================ KEY COMMANDS ====================
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -688,6 +704,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
+ // ================ MIGRATION COMMANDS ====================
+
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -704,6 +722,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
+ // ================ REPAIR COMMANDS ====================
+
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -747,6 +767,8 @@ impl AdminRpcHandler {
}
}
+ // ================ STATS COMMANDS ====================
+
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
@@ -763,11 +785,12 @@ impl AdminRpcHandler {
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
- .await?
+ .await
{
- Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
- Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
- Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(),
+ Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
+ Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
+ Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
+ Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
Ok(AdminRpc::Ok(ret))
@@ -787,6 +810,7 @@ impl AdminRpcHandler {
.unwrap_or_else(|| "(unknown)".into()),
)
.unwrap();
+
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather ring statistics
@@ -805,21 +829,38 @@ impl AdminRpcHandler {
writeln!(&mut ret, " {:?} {}", n, c).unwrap();
}
- self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?;
- self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?;
+ // Gather table statistics
+ let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
+ table.push(self.gather_table_stats(&self.garage.bucket_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.key_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.object_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.version_table, opt.detailed)?);
+ table.push(self.gather_table_stats(&self.garage.block_ref_table, opt.detailed)?);
+ write!(
+ &mut ret,
+ "\nTable stats:\n{}",
+ format_table_to_string(table)
+ )
+ .unwrap();
+ // Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
- if opt.detailed {
- writeln!(
- &mut ret,
- " number of RC entries (~= number of blocks): {}",
- self.garage.block_manager.rc_len()?
- )
- .unwrap();
- }
+ let rc_len = if opt.detailed {
+ self.garage.block_manager.rc_len()?.to_string()
+ } else {
+ self.garage
+ .block_manager
+ .rc_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into())
+ };
+
+ writeln!(
+ &mut ret,
+ " number of RC entries (~= number of blocks): {}",
+ rc_len
+ )
+ .unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
@@ -833,67 +874,84 @@ impl AdminRpcHandler {
)
.unwrap();
+ if !opt.detailed {
+ writeln!(&mut ret, "\nIf values are missing (marked as NC), consider adding the --detailed flag - this will be slow.").unwrap();
+ }
+
Ok(ret)
}
fn gather_table_stats<F, R>(
&self,
- to: &mut String,
t: &Arc<Table<F, R>>,
- opt: &StatsOpt,
- ) -> Result<(), Error>
+ detailed: bool,
+ ) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- writeln!(to, "\nTable stats for {}", F::TABLE_NAME).unwrap();
- if opt.detailed {
- writeln!(
- to,
- " number of items: {}",
- t.data.store.len().map_err(GarageError::from)?
+ let (data_len, mkl_len) = if detailed {
+ (
+ t.data.store.len().map_err(GarageError::from)?.to_string(),
+ t.merkle_updater.merkle_tree_len()?.to_string(),
)
- .unwrap();
- writeln!(
- to,
- " Merkle tree size: {}",
- t.merkle_updater.merkle_tree_len()?
+ } else {
+ (
+ t.data
+ .store
+ .fast_len()
+ .map_err(GarageError::from)?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
+ t.merkle_updater
+ .merkle_tree_fast_len()?
+ .map(|x| x.to_string())
+ .unwrap_or_else(|| "NC".into()),
)
- .unwrap();
- }
- writeln!(
- to,
- " Merkle updater todo queue length: {}",
- t.merkle_updater.todo_len()?
- )
- .unwrap();
- writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()?).unwrap();
+ };
- Ok(())
+ Ok(format!(
+ " {}\t{}\t{}\t{}\t{}",
+ F::TABLE_NAME,
+ data_len,
+ mkl_len,
+ t.merkle_updater.todo_len()?,
+ t.data.gc_todo_len()?
+ ))
}
- // ----
+ // ================ WORKER COMMANDS ====================
- async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
- match opt.cmd {
- WorkerCmd::List { opt } => {
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
let workers = self.garage.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, opt))
+ Ok(AdminRpc::WorkerList(workers, *opt))
}
- WorkerCmd::Set { opt } => match opt {
+ WorkerOperation::Info { tid } => {
+ let info = self
+ .garage
+ .background
+ .get_worker_info()
+ .get(tid)
+ .ok_or_bad_request(format!("No worker with TID {}", tid))?
+ .clone();
+ Ok(AdminRpc::WorkerInfo(*tid, info))
+ }
+ WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
.await;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
- WorkerSetCmd::ResyncNWorkers { n_workers } => {
+ WorkerSetCmd::ResyncWorkerCount { worker_count } => {
self.garage
.block_manager
.resync
- .set_n_workers(n_workers)
+ .set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
@@ -901,13 +959,154 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_tranquility(tranquility)
+ .set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
}
+
+ // ================ BLOCK COMMANDS ====================
+
+ async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+ BlockOperation::RetryNow { all, blocks } => {
+ if *all {
+ if !blocks.is_empty() {
+ return Err(Error::BadRequest(
+ "--all was specified, cannot also specify blocks".into(),
+ ));
+ }
+ let blocks = self.garage.block_manager.list_resync_errors()?;
+ for b in blocks.iter() {
+ self.garage.block_manager.resync.clear_backoff(&b.hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ } else {
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ self.garage.block_manager.resync.clear_backoff(&hash)?;
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks returned in queue for a retry now (check logs to see results)",
+ blocks.len()
+ )))
+ }
+ }
+ BlockOperation::Purge { yes, blocks } => {
+ if !yes {
+ return Err(Error::BadRequest(
+ "Pass the --yes flag to confirm block purge operation.".into(),
+ ));
+ }
+
+ let mut obj_dels = 0;
+ let mut ver_dels = 0;
+
+ for hash in blocks {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+
+ for br in block_refs {
+ let version = match self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ if let Some(object) = self
+ .garage
+ .object_table
+ .get(&version.bucket_id, &version.key)
+ .await?
+ {
+ let ov = object.versions().iter().rev().find(|v| v.is_complete());
+ if let Some(ov) = ov {
+ if ov.uuid == br.version {
+ let del_uuid = gen_uuid();
+ let deleted_object = Object::new(
+ version.bucket_id,
+ version.key.clone(),
+ vec![ObjectVersion {
+ uuid: del_uuid,
+ timestamp: ov.timestamp + 1,
+ state: ObjectVersionState::Complete(
+ ObjectVersionData::DeleteMarker,
+ ),
+ }],
+ );
+ self.garage.object_table.insert(&deleted_object).await?;
+ obj_dels += 1;
+ }
+ }
+ }
+
+ if !version.deleted.get() {
+ let deleted_version = Version::new(
+ version.uuid,
+ version.bucket_id,
+ version.key.clone(),
+ true,
+ );
+ self.garage.version_table.insert(&deleted_version).await?;
+ ver_dels += 1;
+ }
+ }
+ }
+ Ok(AdminRpc::Ok(format!(
+ "{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
+ blocks.len(),
+ obj_dels,
+ ver_dels
+ )))
+ }
+ }
+ }
}
#[async_trait]
@@ -923,7 +1122,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}