aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/admin.rs')
-rw-r--r--src/garage/admin.rs92
1 files changed, 78 insertions, 14 deletions
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index e5bf5601..c0b0b3c9 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,6 +15,7 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
@@ -24,6 +25,7 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
+use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@@ -38,7 +40,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
// Replies
Ok(String),
@@ -55,6 +58,12 @@ pub enum AdminRpc {
WorkerListOpt,
),
WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
}
impl Rpc for AdminRpc {
@@ -74,6 +83,8 @@ impl AdminRpcHandler {
admin
}
+ // ================ BUCKET COMMANDS ====================
+
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
@@ -552,6 +563,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret))
}
+ // ================ KEY COMMANDS ====================
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -689,6 +702,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
+ // ================ MIGRATION COMMANDS ====================
+
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -705,6 +720,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
+ // ================ REPAIR COMMANDS ====================
+
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -748,6 +765,8 @@ impl AdminRpcHandler {
}
}
+ // ================ STATS COMMANDS ====================
+
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
@@ -873,27 +892,27 @@ impl AdminRpcHandler {
Ok(())
}
- // ----
+ // ================ WORKER COMMANDS ====================
- async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
- match opt.cmd {
- WorkerCmd::List { opt } => {
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
let workers = self.garage.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, opt))
+ Ok(AdminRpc::WorkerList(workers, *opt))
}
- WorkerCmd::Info { tid } => {
+ WorkerOperation::Info { tid } => {
let info = self
.garage
.background
.get_worker_info()
- .get(&tid)
+ .get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
- Ok(AdminRpc::WorkerInfo(tid, info))
+ Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerCmd::Set { opt } => match opt {
+ WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
@@ -904,7 +923,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_n_workers(worker_count)
+ .set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
@@ -912,13 +931,57 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_tranquility(tranquility)
+ .set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
}
+
+ // ================ BLOCK COMMANDS ====================
+
+ async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+ BlockOperation::RetryNow { .. } => {
+ Err(GarageError::Message("not implemented".into()).into())
+ }
+ BlockOperation::Purge { .. } => {
+ Err(GarageError::Message("not implemented".into()).into())
+ }
+ }
+ }
}
#[async_trait]
@@ -934,7 +997,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}