aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-27 10:36:04 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-27 11:14:55 +0200
commit6b47c294f570b141a2349d5b6da537c0b64d165d (patch)
tree9716a373adc6b51a249af015e2a461904210d4ba
parent28c015d9ffcb3255295572465654fdca680b1964 (diff)
downloadgarage-6b47c294f570b141a2349d5b6da537c0b64d165d.tar.gz
garage-6b47c294f570b141a2349d5b6da537c0b64d165d.zip
Refactoring on repair commandscli-verify-integrity
-rw-r--r--src/garage/cli/structs.rs6
-rw-r--r--src/garage/repair.rs80
-rw-r--r--src/model/block.rs70
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/token_bucket.rs40
5 files changed, 105 insertions, 92 deletions
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 588900a3..620be9ef 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -265,7 +265,7 @@ pub struct RepairOpt {
pub yes: bool,
#[structopt(subcommand)]
- pub what: Option<RepairWhat>,
+ pub what: RepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
@@ -283,8 +283,8 @@ pub enum RepairWhat {
#[structopt(name = "block_refs")]
BlockRefs,
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
- #[structopt(name = "blocks_integrity")]
- BlockIntegrity {
+ #[structopt(name = "scrub")]
+ Scrub {
/// Limit on i/o speed, in B/s
#[structopt(name = "limit")]
limit: Option<usize>,
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index a67bf2e5..bfe7bf84 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -27,50 +27,38 @@ impl Repair {
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
- let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
-
- if todo(RepairWhat::Tables) {
- info!("Launching a full sync of tables");
- self.garage.bucket_table.syncer.add_full_sync();
- self.garage.object_table.syncer.add_full_sync();
- self.garage.version_table.syncer.add_full_sync();
- self.garage.block_ref_table.syncer.add_full_sync();
- self.garage.key_table.syncer.add_full_sync();
- }
-
- // TODO: wait for full sync to finish before proceeding to the rest?
-
- if todo(RepairWhat::Versions) {
- info!("Repairing the versions table");
- self.repair_versions(&must_exit).await?;
- }
-
- if todo(RepairWhat::BlockRefs) {
- info!("Repairing the block refs table");
- self.repair_block_ref(&must_exit).await?;
- }
-
- if opt.what.is_none() {
- info!("Repairing the RC");
- self.repair_rc(&must_exit).await?;
- }
-
- if todo(RepairWhat::Blocks) {
- info!("Repairing the stored blocks");
- self.garage
- .block_manager
- .repair_data_store(&must_exit)
- .await?;
- }
-
- if let Some(RepairWhat::BlockIntegrity { limit }) = opt.what {
- info!("Verifying integrity of stored blocks");
- self.garage
- .block_manager
- .verify_data_store_integrity(&must_exit, limit)
- .await?;
+ match opt.what {
+ RepairWhat::Tables => {
+ info!("Launching a full sync of tables");
+ self.garage.bucket_table.syncer.add_full_sync();
+ self.garage.object_table.syncer.add_full_sync();
+ self.garage.version_table.syncer.add_full_sync();
+ self.garage.block_ref_table.syncer.add_full_sync();
+ self.garage.key_table.syncer.add_full_sync();
+ }
+ RepairWhat::Versions => {
+ info!("Repairing the versions table");
+ self.repair_versions(&must_exit).await?;
+ }
+ RepairWhat::BlockRefs => {
+ info!("Repairing the block refs table");
+ self.repair_block_ref(&must_exit).await?;
+ }
+ RepairWhat::Blocks => {
+ info!("Repairing the stored blocks");
+ self.garage
+ .block_manager
+ .repair_data_store(&must_exit)
+ .await?;
+ }
+ RepairWhat::Scrub { limit } => {
+ info!("Verifying integrity of stored blocks");
+ self.garage
+ .block_manager
+ .scrub_data_store(&must_exit, limit)
+ .await?;
+ }
}
-
Ok(())
}
@@ -158,10 +146,4 @@ impl Repair {
}
Ok(())
}
-
- async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // TODO
- warn!("repair_rc: not implemented");
- Ok(())
- }
}
diff --git a/src/model/block.rs b/src/model/block.rs
index c43c0b97..35d3871a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -10,11 +10,11 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
-use tokio::time::Instant;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
+use garage_util::token_bucket::TokenBucket;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -209,6 +209,35 @@ impl BlockManager {
.await
}
+ /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
+ /// this function.
+ pub async fn scrub_data_store(
+ &self,
+ must_exit: &watch::Receiver<bool>,
+ speed_limit: Option<usize>,
+ ) -> Result<(), Error> {
+ let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
+ self.for_each_file(
+ token_bucket,
+ move |mut token_bucket, hash| {
+ async move {
+ let len = match self.read_block(&hash).await {
+ Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
+ Ok(_) => unreachable!(),
+ Err(_) => 0, // resync and warn message made by read_block if necessary
+ };
+
+ if let Some(tb) = &mut token_bucket {
+ tb.take(len as u64).await;
+ }
+ Ok(token_bucket)
+ }
+ },
+ must_exit,
+ )
+ .await
+ }
+
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len()
@@ -553,45 +582,6 @@ impl BlockManager {
}
.boxed()
}
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn verify_data_store_integrity(
- &self,
- must_exit: &watch::Receiver<bool>,
- speed_limit: Option<usize>,
- ) -> Result<(), Error> {
- let last_refill = Instant::now();
- let token_left = speed_limit.unwrap_or(0);
- self.for_each_file(
- (last_refill, token_left),
- move |(last_refill, token_left), hash| {
- async move {
- let len = match self.read_block(&hash).await {
- Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
- Ok(_) => unreachable!(),
- Err(_) => 0, // resync and warn message made by read_block if necessary
- };
-
- if let Some(speed_limit) = speed_limit {
- // throttling logic
- if let Some(t) = token_left.checked_sub(len) {
- // token bucket not empty yet
- Ok((last_refill, t))
- } else {
- // token bucket empty. Sleep and refill
- tokio::time::sleep_until(last_refill + Duration::from_secs(1)).await;
- Ok((Instant::now(), speed_limit))
- }
- } else {
- Ok((last_refill, token_left)) // actually not used
- }
- }
- },
- must_exit,
- )
- .await
- }
}
#[async_trait]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index c080e3a3..e2e01785 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -9,3 +9,4 @@ pub mod data;
pub mod error;
pub mod persister;
pub mod time;
+pub mod token_bucket;
diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs
new file mode 100644
index 00000000..cc0dfa1f
--- /dev/null
+++ b/src/util/token_bucket.rs
@@ -0,0 +1,40 @@
+use std::time::{Duration, Instant};
+
+use tokio::time::sleep;
+
+pub struct TokenBucket {
+ // Replenish rate: number of tokens per second
+ replenish_rate: u64,
+ // Current number of tokens
+ tokens: u64,
+ // Last replenish time
+ last_replenish: Instant,
+}
+
+impl TokenBucket {
+ pub fn new(replenish_rate: u64) -> Self {
+ Self {
+ replenish_rate,
+ tokens: 0,
+ last_replenish: Instant::now(),
+ }
+ }
+
+ pub async fn take(&mut self, tokens: u64) {
+ while self.tokens < tokens {
+ let needed = tokens - self.tokens;
+ let delay = (needed as f64) / (self.replenish_rate as f64);
+ sleep(Duration::from_secs_f64(delay)).await;
+ self.replenish();
+ }
+ self.tokens -= tokens;
+ }
+
+ pub fn replenish(&mut self) {
+ let now = Instant::now();
+ let new_tokens =
+ ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
+ self.tokens += new_tokens;
+ self.last_replenish = now;
+ }
+}