aboutsummaryrefslogtreecommitdiff
path: root/src/garage/admin/mod.rs
blob: c4ab281033e2e237b36f87898d23b5fe7197885e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::sync::Arc;

use async_trait::async_trait;
use serde::{Deserialize, Serialize};

use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;

use garage_rpc::*;

use garage_model::garage::Garage;
use garage_model::helper::error::Error;

use crate::cli::*;
use crate::repair::online::launch_online_repair;

pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";

#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
	LaunchRepair(RepairOpt),

	// Replies
	Ok(String),
}

impl Rpc for AdminRpc {
	type Response = Result<AdminRpc, Error>;
}

pub struct AdminRpcHandler {
	garage: Arc<Garage>,
	background: Arc<BackgroundRunner>,
	endpoint: Arc<Endpoint<AdminRpc, Self>>,
}

impl AdminRpcHandler {
	pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
		let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
		let admin = Arc::new(Self {
			garage,
			background,
			endpoint,
		});
		admin.endpoint.set_handler(admin.clone());
		admin
	}

	// ================ REPAIR COMMANDS ====================

	async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
		if !opt.yes {
			return Err(Error::BadRequest(
				"Please provide the --yes flag to initiate repair operations.".to_string(),
			));
		}
		if opt.all_nodes {
			let mut opt_to_send = opt.clone();
			opt_to_send.all_nodes = false;

			let mut failures = vec![];
			let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
			for node in all_nodes.iter() {
				let node = (*node).into();
				let resp = self
					.endpoint
					.call(
						&node,
						AdminRpc::LaunchRepair(opt_to_send.clone()),
						PRIO_NORMAL,
					)
					.await;
				if !matches!(resp, Ok(Ok(_))) {
					failures.push(node);
				}
			}
			if failures.is_empty() {
				Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
			} else {
				Err(Error::BadRequest(format!(
					"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
					failures
				)))
			}
		} else {
			launch_online_repair(&self.garage, &self.background, opt).await?;
			Ok(AdminRpc::Ok(format!(
				"Repair launched on {:?}",
				self.garage.system.id
			)))
		}
	}
}

#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
	async fn handle(
		self: &Arc<Self>,
		message: &AdminRpc,
		_from: NodeID,
	) -> Result<AdminRpc, Error> {
		match message {
			AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
			m => Err(GarageError::unexpected_rpc_message(m).into()),
		}
	}
}