aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs34
-rw-r--r--src/block/rc.rs7
-rw-r--r--src/block/resync.rs12
-rw-r--r--src/garage/admin.rs92
-rw-r--r--src/garage/cli/cmd.rs13
-rw-r--r--src/garage/cli/structs.rs45
-rw-r--r--src/garage/cli/util.rs59
-rw-r--r--src/table/util.rs6
8 files changed, 240 insertions, 28 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 7f439b96..26e15bf5 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -90,6 +90,15 @@ pub struct BlockManager {
tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
}
+#[derive(Serialize, Deserialize, Clone, Debug)]
+pub struct BlockResyncErrorInfo {
+ pub hash: Hash,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try: u64,
+ pub next_try: u64,
+}
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -314,6 +323,31 @@ impl BlockManager {
let _ = self.tx_scrub_command.send(cmd).await;
}
+ /// Get the reference count of a block
+ pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
+ Ok(self.rc.get_block_rc(hash)?.as_u64())
+ }
+
+ /// List all resync errors
+ pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
+ let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ for ent in self.resync.errors.iter()? {
+ let (hash, cnt) = ent?;
+ let cnt = ErrorCounter::decode(&cnt);
+ blocks.push(BlockResyncErrorInfo {
+ hash: Hash::try_from(&hash).unwrap(),
+ refcount: 0,
+ error_count: cnt.errors,
+ last_try: cnt.last_try,
+ next_try: cnt.next_try(),
+ });
+ }
+ for block in blocks.iter_mut() {
+ block.refcount = self.get_block_rc(&block.hash)?;
+ }
+ Ok(blocks)
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
diff --git a/src/block/rc.rs b/src/block/rc.rs
index ce6defad..8dae3960 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -169,4 +169,11 @@ impl RcEntry {
pub(crate) fn is_needed(&self) -> bool {
!self.is_deletable()
}
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ match self {
+ RcEntry::Present { count } => *count,
+ _ => 0,
+ }
+ }
}
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 55d28c14..53b44774 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -540,9 +540,9 @@ impl Worker for ResyncWorker {
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
-struct ErrorCounter {
- errors: u64,
- last_try: u64,
+pub(crate) struct ErrorCounter {
+ pub(crate) errors: u64,
+ pub(crate) last_try: u64,
}
impl ErrorCounter {
@@ -553,12 +553,13 @@ impl ErrorCounter {
}
}
- fn decode(data: &[u8]) -> Self {
+ pub(crate) fn decode(data: &[u8]) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
+
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
@@ -578,7 +579,8 @@ impl ErrorCounter {
(RESYNC_RETRY_DELAY.as_millis() as u64)
<< std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER)
}
- fn next_try(&self) -> u64 {
+
+ pub(crate) fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index e5bf5601..c0b0b3c9 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -15,6 +15,7 @@ use garage_table::*;
use garage_rpc::*;
+use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
@@ -24,6 +25,7 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
+use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@@ -38,7 +40,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+ BlockOperation(BlockOperation),
// Replies
Ok(String),
@@ -55,6 +58,12 @@ pub enum AdminRpc {
WorkerListOpt,
),
WorkerInfo(usize, garage_util::background::WorkerInfo),
+ BlockErrorList(Vec<BlockResyncErrorInfo>),
+ BlockInfo {
+ hash: Hash,
+ refcount: u64,
+ versions: Vec<Result<Version, Uuid>>,
+ },
}
impl Rpc for AdminRpc {
@@ -74,6 +83,8 @@ impl AdminRpcHandler {
admin
}
+ // ================ BUCKET COMMANDS ====================
+
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
@@ -552,6 +563,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret))
}
+ // ================ KEY COMMANDS ====================
+
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@@ -689,6 +702,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
+ // ================ MIGRATION COMMANDS ====================
+
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -705,6 +720,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
+ // ================ REPAIR COMMANDS ====================
+
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@@ -748,6 +765,8 @@ impl AdminRpcHandler {
}
}
+ // ================ STATS COMMANDS ====================
+
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
@@ -873,27 +892,27 @@ impl AdminRpcHandler {
Ok(())
}
- // ----
+ // ================ WORKER COMMANDS ====================
- async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
- match opt.cmd {
- WorkerCmd::List { opt } => {
+ async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ WorkerOperation::List { opt } => {
let workers = self.garage.background.get_worker_info();
- Ok(AdminRpc::WorkerList(workers, opt))
+ Ok(AdminRpc::WorkerList(workers, *opt))
}
- WorkerCmd::Info { tid } => {
+ WorkerOperation::Info { tid } => {
let info = self
.garage
.background
.get_worker_info()
- .get(&tid)
+ .get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
- Ok(AdminRpc::WorkerInfo(tid, info))
+ Ok(AdminRpc::WorkerInfo(*tid, info))
}
- WorkerCmd::Set { opt } => match opt {
+ WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
- let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
+ let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
@@ -904,7 +923,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_n_workers(worker_count)
+ .set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
@@ -912,13 +931,57 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
- .set_tranquility(tranquility)
+ .set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
}
+
+ // ================ BLOCK COMMANDS ====================
+
+ async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
+ match cmd {
+ BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
+ self.garage.block_manager.list_resync_errors()?,
+ )),
+ BlockOperation::Info { hash } => {
+ let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
+ let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
+ let refcount = self.garage.block_manager.get_block_rc(&hash)?;
+ let block_refs = self
+ .garage
+ .block_ref_table
+ .get_range(&hash, None, None, 10000, Default::default())
+ .await?;
+ let mut versions = vec![];
+ for br in block_refs {
+ if let Some(v) = self
+ .garage
+ .version_table
+ .get(&br.version, &EmptyKey)
+ .await?
+ {
+ versions.push(Ok(v));
+ } else {
+ versions.push(Err(br.version));
+ }
+ }
+ Ok(AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ })
+ }
+ BlockOperation::RetryNow { .. } => {
+ Err(GarageError::Message("not implemented".into()).into())
+ }
+ BlockOperation::Purge { .. } => {
+ Err(GarageError::Message("not implemented".into()).into())
+ }
+ }
+ }
}
#[async_trait]
@@ -934,7 +997,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
- AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
+ AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
+ AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 6df15a48..6c5598b1 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
+ Command::Block(bo) => {
+ cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
+ }
_ => unreachable!(),
}
}
@@ -191,6 +194,16 @@ pub async fn cmd_admin(
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
+ AdminRpc::BlockErrorList(el) => {
+ print_block_error_list(el);
+ }
+ AdminRpc::BlockInfo {
+ hash,
+ refcount,
+ versions,
+ } => {
+ print_block_info(hash, refcount, versions);
+ }
r => {
error!("Unexpected response: {:?}", r);
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 9334564b..6d74b1a4 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -49,7 +49,11 @@ pub enum Command {
/// Manage background workers
#[structopt(name = "worker", version = garage_version())]
- Worker(WorkerOpt),
+ Worker(WorkerOperation),
+
+ /// Low-level debug operations on data blocks
+ #[structopt(name = "block", version = garage_version())]
+ Block(BlockOperation),
}
#[derive(StructOpt, Debug)]
@@ -502,14 +506,8 @@ pub struct StatsOpt {
pub detailed: bool,
}
-#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
-pub struct WorkerOpt {
- #[structopt(subcommand)]
- pub cmd: WorkerCmd,
-}
-
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
-pub enum WorkerCmd {
+pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
List {
@@ -549,3 +547,34 @@ pub enum WorkerSetCmd {
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
+pub enum BlockOperation {
+ /// List all blocks that currently have a resync error
+ #[structopt(name = "list-errors", version = garage_version())]
+ ListErrors,
+ /// Get detailed information about a single block
+ #[structopt(name = "info", version = garage_version())]
+ Info {
+ /// Hash of the block for which to retrieve information
+ hash: String,
+ },
+ /// Retry now the resync of one or many blocks
+ #[structopt(name = "retry-now", version = garage_version())]
+ RetryNow {
+ /// Retry all blocks that have a resync error
+ #[structopt(long = "all")]
+ all: bool,
+ /// Hashes of the block to retry to resync now
+ blocks: Vec<String>,
+ },
+ /// Delete all objects referencing a missing block
+ #[structopt(name = "purge", version = garage_version())]
+ Purge {
+ /// Mandatory to confirm this operation
+ #[structopt(long = "yes")]
+ yes: bool,
+ /// Hashes of the block to purge
+ blocks: Vec<String>,
+ },
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index c1d03b8d..737b54b2 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -3,14 +3,17 @@ use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
-use garage_util::data::Uuid;
+use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*;
+use garage_block::manager::BlockResyncErrorInfo;
+
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use garage_model::s3::version_table::Version;
use crate::cli::structs::WorkerListOpt;
@@ -353,3 +356,57 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
}
format_table(table);
}
+
+pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
+ let now = now_msec();
+ let tf = timeago::Formatter::new();
+ let mut tf2 = timeago::Formatter::new();
+ tf2.ago("");
+
+ let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
+ for e in el {
+ table.push(format!(
+ "{}\t{}\t{}\t{}\tin {}",
+ hex::encode(e.hash.as_slice()),
+ e.refcount,
+ e.error_count,
+ tf.convert(Duration::from_millis(now - e.last_try)),
+ tf2.convert(Duration::from_millis(e.next_try - now))
+ ));
+ }
+ format_table(table);
+}
+
+pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
+ println!("Block hash: {}", hex::encode(hash.as_slice()));
+ println!("Refcount: {}", refcount);
+ println!();
+
+ let mut table = vec!["Version\tBucket\tPath\tDeleted".into()];
+ let mut nondeleted_count = 0;
+ for v in versions.iter() {
+ match v {
+ Ok(ver) => {
+ table.push(format!(
+ "{:?}\t{:?}\t{}\t{:?}",
+ ver.uuid,
+ ver.bucket_id,
+ ver.key,
+ ver.deleted.get()
+ ));
+ if !ver.deleted.get() {
+ nondeleted_count += 1;
+ }
+ }
+ Err(vh) => {
+ table.push(format!("{:?}\t\t\tyes", vh));
+ }
+ }
+ }
+ format_table(table);
+
+ if refcount != nondeleted_count {
+ println!();
+ println!("Warning: refcount does not match number of non-deleted versions");
+ }
+}
diff --git a/src/table/util.rs b/src/table/util.rs
index 20595a94..0b10cf3f 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -49,3 +49,9 @@ impl EnumerationOrder {
}
}
}
+
+impl Default for EnumerationOrder {
+ fn default() -> Self {
+ EnumerationOrder::Forward
+ }
+}