aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-19 20:36:36 +0000
commit5ae32972efaba357ecc0027fe852d710b16b6d0e (patch)
treec49938469ac6f01c1501a79f96260269a410b739
parenta54f3158f12cbc69107b0a65af6c2e56fda5b2d7 (diff)
downloadgarage-5ae32972efaba357ecc0027fe852d710b16b6d0e.tar.gz
garage-5ae32972efaba357ecc0027fe852d710b16b6d0e.zip
Implement repair command
-rw-r--r--src/admin_rpc.rs41
-rw-r--r--src/block.rs84
-rw-r--r--src/main.rs32
-rw-r--r--src/table_sync.rs6
4 files changed, 152 insertions, 11 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index 29037c6c..8e278dbe 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
+use crate::rpc_client::*;
use crate::rpc_server::*;
use crate::server::Garage;
use crate::table::*;
@@ -11,11 +12,13 @@ use crate::*;
use crate::bucket_table::*;
+pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const ADMIN_RPC_PATH: &str = "_admin";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRPC {
BucketOperation(BucketOperation),
+ LaunchRepair(bool),
// Replies
Ok(String),
@@ -27,11 +30,13 @@ impl RpcMessage for AdminRPC {}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
+ rpc_client: Arc<RpcClient<AdminRPC>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
- Arc::new(Self { garage })
+ let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH);
+ Arc::new(Self { garage, rpc_client })
}
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
@@ -40,6 +45,9 @@ impl AdminRpcHandler {
async move {
match msg {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ AdminRPC::LaunchRepair(repair_all) => {
+ self2.handle_launch_repair(repair_all).await
+ }
_ => Err(Error::Message(format!("Invalid RPC"))),
}
}
@@ -143,4 +151,35 @@ impl AdminRpcHandler {
}
}
}
+
+ async fn handle_launch_repair(&self, repair_all: bool) -> Result<AdminRPC, Error> {
+ if repair_all {
+ let mut failures = vec![];
+ let ring = self.garage.system.ring.borrow().clone();
+ for node in ring.config.members.keys() {
+ if self
+ .rpc_client
+ .call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT)
+ .await
+ .is_err()
+ {
+ failures.push(node.clone());
+ }
+ }
+ if failures.is_empty() {
+ Ok(AdminRPC::Ok(format!("Repair launched on all nodes")))
+ } else {
+ Err(Error::Message(format!(
+ "Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
+ failures
+ )))
+ }
+ } else {
+ self.garage.block_manager.launch_repair().await?;
+ Ok(AdminRPC::Ok(format!(
+ "Repair launched on {:?}",
+ self.garage.system.id
+ )))
+ }
+ }
}
diff --git a/src/block.rs b/src/block.rs
index 489dc33e..4ad74d76 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -17,6 +17,7 @@ use crate::membership::System;
use crate::rpc_client::*;
use crate::rpc_server::*;
+use crate::block_ref_table::*;
use crate::server::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
@@ -356,6 +357,89 @@ impl BlockManager {
.await?;
Ok(())
}
+
+ pub async fn launch_repair(self: &Arc<Self>) -> Result<(), Error> {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
+ .await;
+ Ok(())
+ }
+
+ pub async fn repair_worker(
+ self: Arc<Self>,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ // 1. Repair blocks from RC table
+ let garage = self.garage.load_full().unwrap();
+ let mut last_hash = None;
+ let mut i = 0usize;
+ for entry in garage.block_ref_table.store.iter() {
+ let (_k, v_bytes) = entry?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
+ if Some(&block_ref.block) == last_hash.as_ref() {
+ continue;
+ }
+ if !block_ref.deleted {
+ last_hash = Some(block_ref.block.clone());
+ self.put_to_resync(&block_ref.block, 0)?;
+ }
+ i += 1;
+ if i & 0xFF == 0 && *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+
+ // 2. Repair blocks actually on disk
+ let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
+ while let Some(data_dir_ent) = ls_data_dir.next().await {
+ let data_dir_ent = data_dir_ent?;
+ let dir_name = data_dir_ent.file_name();
+ let dir_name = match dir_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
+ continue;
+ }
+
+ let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
+ Err(e) => {
+ eprintln!(
+ "Warning: could not list dir {:?}: {}",
+ data_dir_ent.path().to_str(),
+ e
+ );
+ continue;
+ }
+ Ok(x) => x,
+ };
+ while let Some(file) = ls_data_dir_2.next().await {
+ let file = file?;
+ let file_name = file.file_name();
+ let file_name = match file_name.into_string() {
+ Ok(x) => x,
+ Err(_) => continue,
+ };
+ if file_name.len() != 64 {
+ continue;
+ }
+ let hash_bytes = match hex::decode(&file_name) {
+ Ok(h) => h,
+ Err(_) => continue,
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ self.put_to_resync(&hash.into(), 0)?;
+
+ if *must_exit.borrow() {
+ return Ok(());
+ }
+ }
+ }
+ Ok(())
+ }
}
fn u64_from_bytes(bytes: &[u8]) -> u64 {
diff --git a/src/main.rs b/src/main.rs
index 1d582c25..11890e57 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -39,8 +39,6 @@ use server::TlsConfig;
use admin_rpc::*;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
-
#[derive(StructOpt, Debug)]
#[structopt(name = "garage")]
pub struct Opt {
@@ -76,6 +74,10 @@ pub enum Command {
/// Bucket operations
#[structopt(name = "bucket")]
Bucket(BucketOperation),
+
+ /// Start repair of node data
+ #[structopt(name = "repair")]
+ Repair(RepairOpt),
}
#[derive(StructOpt, Debug)]
@@ -179,6 +181,13 @@ pub struct PermBucketOpt {
pub bucket: String,
}
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct RepairOpt {
+ /// Launch repair operation on all nodes
+ #[structopt(long = "all")]
+ pub all: bool,
+}
+
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@@ -222,6 +231,9 @@ async fn main() {
Command::Bucket(bo) => {
cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
}
+ Command::Repair(ro) => {
+ cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro.all)).await
+ }
};
if let Err(e) = resp {
@@ -231,14 +243,14 @@ async fn main() {
async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
+ .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
.await?
{
Message::AdvertiseNodesUp(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await?
{
Message::AdvertiseConfig(cfg) => cfg,
@@ -290,7 +302,7 @@ async fn cmd_configure(
args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
+ .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT)
.await?
{
Message::AdvertiseNodesUp(nodes) => nodes,
@@ -311,7 +323,7 @@ async fn cmd_configure(
}
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await?
{
Message::AdvertiseConfig(cfg) => cfg,
@@ -331,7 +343,7 @@ async fn cmd_configure(
.call(
&rpc_host,
&Message::AdvertiseConfig(config),
- DEFAULT_TIMEOUT,
+ ADMIN_RPC_TIMEOUT,
)
.await?;
Ok(())
@@ -343,7 +355,7 @@ async fn cmd_remove(
args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
- .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT)
.await?
{
Message::AdvertiseConfig(cfg) => cfg,
@@ -377,7 +389,7 @@ async fn cmd_remove(
.call(
&rpc_host,
&Message::AdvertiseConfig(config),
- DEFAULT_TIMEOUT,
+ ADMIN_RPC_TIMEOUT,
)
.await?;
Ok(())
@@ -388,7 +400,7 @@ async fn cmd_admin(
rpc_host: SocketAddr,
args: AdminRPC,
) -> Result<(), Error> {
- match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
+ match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? {
AdminRPC::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index e394ba0d..2fb5de77 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -220,11 +220,17 @@ where
})
.collect::<FuturesUnordered<_>>();
+ let mut n_errors = 0;
while let Some(r) = sync_futures.next().await {
if let Err(e) = r {
+ n_errors += 1;
eprintln!("({}) Sync error: {}", self.table.name, e);
}
}
+ if n_errors > self.table.replication.max_write_errors() {
+ return Err(Error::Message(format!("Sync failed with too many nodes.")));
+ }
+
if !partition.retain {
self.table
.delete_range(&partition.begin, &partition.end)