aboutsummaryrefslogtreecommitdiff
path: root/src/admin_rpc.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
commit5ae32972efaba357ecc0027fe852d710b16b6d0e (patch)
treec49938469ac6f01c1501a79f96260269a410b739 /src/admin_rpc.rs
parenta54f3158f12cbc69107b0a65af6c2e56fda5b2d7 (diff)
downloadgarage-5ae32972efaba357ecc0027fe852d710b16b6d0e.tar.gz
garage-5ae32972efaba357ecc0027fe852d710b16b6d0e.zip
Implement repair command
Diffstat (limited to 'src/admin_rpc.rs')
-rw-r--r--src/admin_rpc.rs41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index 29037c6c..8e278dbe 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
+use crate::rpc_client::*;
use crate::rpc_server::*;
use crate::server::Garage;
use crate::table::*;
@@ -11,11 +12,13 @@ use crate::*;
use crate::bucket_table::*;
+pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const ADMIN_RPC_PATH: &str = "_admin";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRPC {
BucketOperation(BucketOperation),
+ LaunchRepair(bool),
// Replies
Ok(String),
@@ -27,11 +30,13 @@ impl RpcMessage for AdminRPC {}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
+ rpc_client: Arc<RpcClient<AdminRPC>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
- Arc::new(Self { garage })
+ let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH);
+ Arc::new(Self { garage, rpc_client })
}
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
@@ -40,6 +45,9 @@ impl AdminRpcHandler {
async move {
match msg {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ AdminRPC::LaunchRepair(repair_all) => {
+ self2.handle_launch_repair(repair_all).await
+ }
_ => Err(Error::Message(format!("Invalid RPC"))),
}
}
@@ -143,4 +151,35 @@ impl AdminRpcHandler {
}
}
}
+
+ async fn handle_launch_repair(&self, repair_all: bool) -> Result<AdminRPC, Error> {
+ if repair_all {
+ let mut failures = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.config.members.keys() {
+ if self
+ .rpc_client
+ .call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT)
+ .await
+ .is_err()
+ {
+ failures.push(node.clone());
+ }
+ }
+ if failures.is_empty() {
+ Ok(AdminRPC::Ok(format!("Repair launched on all nodes")))
+ } else {
+ Err(Error::Message(format!(
+ "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
+ failures
+ )))
+ }
+ } else {
+ self.garage.block_manager.launch_repair().await?;
+ Ok(AdminRPC::Ok(format!(
+ "Repair launched on {:?}",
+ self.garage.system.id
+ )))
+ }
+ }
}