aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-02 12:42:24 +0000
committerAlex <alex@adnab.me>2023-01-02 12:42:24 +0000
commit7f7d53cfa991054afcd2940cc43a4d7f1a6668e7 (patch)
tree15c60c9ecf583532eaf62d492beb63bd8ae7e3b2 /src/block
parent1af4a5ed569e42f77dd4ecc9364a27f7ed43df63 (diff)
parentd1279e04f3550eae2eb5e0f25efbdf69b42fbeb9 (diff)
downloadgarage-7f7d53cfa991054afcd2940cc43a4d7f1a6668e7.tar.gz
garage-7f7d53cfa991054afcd2940cc43a4d7f1a6668e7.zip
Merge pull request 'improvements to CLI and new debug features' (#448) from cli-improvements into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/448
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs42
-rw-r--r--src/block/metrics.rs12
-rw-r--r--src/block/rc.rs7
-rw-r--r--src/block/repair.rs58
-rw-r--r--src/block/resync.rs57
5 files changed, 129 insertions, 47 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 7f439b96..28523a93 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -90,6 +90,15 @@ pub struct BlockManager {
tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
}
+#[derive(Serialize, Deserialize, Clone, Debug)]
+pub struct BlockResyncErrorInfo {
+ pub hash: Hash,
+ pub refcount: u64,
+ pub error_count: u64,
+ pub last_try: u64,
+ pub next_try: u64,
+}
+
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@@ -114,7 +123,8 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
- let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone());
+ let metrics =
+ BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let (scrub_tx, scrub_rx) = mpsc::channel(1);
@@ -309,11 +319,41 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
+ /// Get number of items in the refcount table
+ pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
+ Ok(self.rc.rc.fast_len()?)
+ }
+
/// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
let _ = self.tx_scrub_command.send(cmd).await;
}
+ /// Get the reference count of a block
+ pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
+ Ok(self.rc.get_block_rc(hash)?.as_u64())
+ }
+
+ /// List all resync errors
+ pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
+ let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ for ent in self.resync.errors.iter()? {
+ let (hash, cnt) = ent?;
+ let cnt = ErrorCounter::decode(&cnt);
+ blocks.push(BlockResyncErrorInfo {
+ hash: Hash::try_from(&hash).unwrap(),
+ refcount: 0,
+ error_count: cnt.errors,
+ last_try: cnt.last_try,
+ next_try: cnt.next_try(),
+ });
+ }
+ for block in blocks.iter_mut() {
+ block.refcount = self.get_block_rc(&block.hash)?;
+ }
+ Ok(blocks)
+ }
+
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 477add66..fbef95af 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,9 +1,11 @@
use opentelemetry::{global, metrics::*};
+use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
+ pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
@@ -23,9 +25,17 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
+ pub fn new(rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
+ _rc_size: meter
+ .u64_value_observer("block.rc_size", move |observer| {
+ if let Ok(Some(v)) = rc_tree.fast_len() {
+ observer.observe(v as u64, &[])
+ }
+ })
+ .with_description("Number of blocks known to the reference counter")
+ .init(),
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
observer.observe(resync_queue.len() as u64, &[])
diff --git a/src/block/rc.rs b/src/block/rc.rs
index ce6defad..8dae3960 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -169,4 +169,11 @@ impl RcEntry {
pub(crate) fn is_needed(&self) -> bool {
!self.is_deletable()
}
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ match self {
+ RcEntry::Present { count } => *count,
+ _ => 0,
+ }
+ }
}
diff --git a/src/block/repair.rs b/src/block/repair.rs
index e2884b69..1878027e 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -53,7 +53,7 @@ impl Worker for RepairWorker {
"Block repair worker".into()
}
- fn info(&self) -> Option<String> {
+ fn status(&self) -> WorkerStatus {
match self.block_iter.as_ref() {
None => {
let idx_bytes = self
@@ -66,9 +66,20 @@ impl Worker for RepairWorker {
} else {
idx_bytes
};
- Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
+ WorkerStatus {
+ progress: Some("0.00%".into()),
+ freeform: vec![format!(
+ "Currently in phase 1, iterator position: {}",
+ hex::encode(idx_bytes)
+ )],
+ ..Default::default()
+ }
}
- Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
+ Some(bi) => WorkerStatus {
+ progress: Some(format!("{:.2}%", bi.progress() * 100.)),
+ freeform: vec!["Currently in phase 2".into()],
+ ..Default::default()
+ },
}
}
@@ -271,29 +282,28 @@ impl Worker for ScrubWorker {
"Block scrub worker".into()
}
- fn info(&self) -> Option<String> {
- let s = match &self.work {
- ScrubWorkerState::Running(bsi) => format!(
- "{:.2}% done (tranquility = {})",
- bsi.progress() * 100.,
- self.persisted.tranquility
- ),
+ fn status(&self) -> WorkerStatus {
+ let mut s = WorkerStatus {
+ persistent_errors: Some(self.persisted.corruptions_detected),
+ tranquility: Some(self.persisted.tranquility),
+ ..Default::default()
+ };
+ match &self.work {
+ ScrubWorkerState::Running(bsi) => {
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ }
ScrubWorkerState::Paused(bsi, rt) => {
- format!(
- "Paused, {:.2}% done, resumes at {}",
- bsi.progress() * 100.,
- msec_to_rfc3339(*rt)
- )
+ s.progress = Some(format!("{:.2}%", bsi.progress() * 100.));
+ s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))];
}
- ScrubWorkerState::Finished => format!(
- "Last completed scrub: {}",
- msec_to_rfc3339(self.persisted.time_last_complete_scrub)
- ),
- };
- Some(format!(
- "{} ; corruptions detected: {}",
- s, self.persisted.corruptions_detected
- ))
+ ScrubWorkerState::Finished => {
+ s.freeform = vec![format!(
+ "Last scrub completed at {}",
+ msec_to_rfc3339(self.persisted.time_last_complete_scrub)
+ )];
+ }
+ }
+ s
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
diff --git a/src/block/resync.rs b/src/block/resync.rs
index ada3ac54..8231b55d 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -123,6 +123,24 @@ impl BlockResyncManager {
Ok(self.errors.len())
}
+ /// Clear the error counter for a block and put it in queue immediately
+ pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> {
+ let now = now_msec();
+ if let Some(ec) = self.errors.get(hash)? {
+ let mut ec = ErrorCounter::decode(&ec);
+ if ec.errors > 0 {
+ ec.last_try = now - ec.delay_msec();
+ self.errors.insert(hash, ec.encode())?;
+ self.put_to_resync_at(hash, now)?;
+ return Ok(());
+ }
+ }
+ Err(Error::Message(format!(
+ "Block {:?} was not in an errored state",
+ hash
+ )))
+ }
+
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@@ -257,7 +275,7 @@ impl BlockResyncManager {
if let Err(e) = &res {
manager.metrics.resync_error_counter.add(1);
- warn!("Error when resyncing {:?}: {}", hash, e);
+ error!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(&ec).add1(now + 1),
@@ -477,27 +495,22 @@ impl Worker for ResyncWorker {
format!("Block resync worker #{}", self.index + 1)
}
- fn info(&self) -> Option<String> {
+ fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load();
if self.index >= persisted.n_workers {
- return Some("(unused)".into());
- }
-
- let mut ret = vec![];
- ret.push(format!("tranquility = {}", persisted.tranquility));
-
- let qlen = self.manager.resync.queue_len().unwrap_or(0);
- if qlen > 0 {
- ret.push(format!("{} blocks in queue", qlen));
+ return WorkerStatus {
+ freeform: vec!["This worker is currently disabled".into()],
+ ..Default::default()
+ };
}
- let elen = self.manager.resync.errors_len().unwrap_or(0);
- if elen > 0 {
- ret.push(format!("{} blocks in error state", elen));
+ WorkerStatus {
+ queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
+ tranquility: Some(persisted.tranquility),
+ persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
+ ..Default::default()
}
-
- Some(ret.join(", "))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@@ -545,9 +558,9 @@ impl Worker for ResyncWorker {
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
-struct ErrorCounter {
- errors: u64,
- last_try: u64,
+pub(crate) struct ErrorCounter {
+ pub(crate) errors: u64,
+ pub(crate) last_try: u64,
}
impl ErrorCounter {
@@ -558,12 +571,13 @@ impl ErrorCounter {
}
}
- fn decode(data: &[u8]) -> Self {
+ pub(crate) fn decode(data: &[u8]) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
+
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
@@ -583,7 +597,8 @@ impl ErrorCounter {
(RESYNC_RETRY_DELAY.as_millis() as u64)
<< std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER)
}
- fn next_try(&self) -> u64 {
+
+ pub(crate) fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}