aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block/lib.rs1
-rw-r--r--src/block/manager.rs160
-rw-r--r--src/block/repair.rs204
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs2
-rw-r--r--src/garage/repair/online.rs49
-rw-r--r--src/util/background/worker.rs3
7 files changed, 236 insertions, 184 deletions
diff --git a/src/block/lib.rs b/src/block/lib.rs
index dc685657..ebdb95d8 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
+pub mod repair;
mod block;
mod metrics;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 8a131270..54368faf 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,7 +1,5 @@
-use core::ops::Bound;
-
use std::convert::TryInto;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -94,7 +92,7 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
- rc: BlockRc,
+ pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
@@ -225,90 +223,6 @@ impl BlockManager {
Ok(())
}
- /// Launch the repair procedure on the data store
- ///
- /// This will list all blocks locally present, as well as those
- /// that are required because of refcount > 0, and will try
- /// to fix any mismatch between the two.
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table.
- let mut next_start: Option<Hash> = None;
- loop {
- // We have to do this complicated two-step process where we first read a bunch
- // of hashes from the RC table, and then insert them in the to-resync queue,
- // because of SQLite. Basically, as long as we have an iterator on a DB table,
- // we can't do anything else on the DB. The naive approach (which we had previously)
- // of just iterating on the RC table and inserting items one to one in the resync
- // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
- // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
- // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
- // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
- let mut batch_of_hashes = vec![];
- let start_bound = match next_start.as_ref() {
- None => Bound::Unbounded,
- Some(x) => Bound::Excluded(x.as_slice()),
- };
- for entry in self
- .rc
- .rc
- .range::<&[u8], _>((start_bound, Bound::Unbounded))?
- {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- batch_of_hashes.push(hash);
- if batch_of_hashes.len() >= 1000 {
- break;
- }
- }
- if batch_of_hashes.is_empty() {
- break;
- }
-
- for hash in batch_of_hashes.into_iter() {
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- next_start = Some(hash)
- }
-
- if *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- // Lists all blocks on disk and adds them to the resync queue.
- // This allows us to find blocks we are storing but don't actually need,
- // so that we can offload them if necessary and then delete them locally.
- self.for_each_file(
- (),
- move |_, hash| async move {
- self.put_to_resync(&hash, Duration::from_secs(0))
- .map_err(Into::into)
- },
- must_exit,
- )
- .await
- }
-
- /// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
- /// this function.
- pub async fn scrub_data_store(
- &self,
- must_exit: &watch::Receiver<bool>,
- tranquility: u32,
- ) -> Result<(), Error> {
- let tranquilizer = Tranquilizer::new(30);
- self.for_each_file(
- tranquilizer,
- move |mut tranquilizer, hash| async move {
- let _ = self.read_block(&hash).await;
- tranquilizer.tranquilize(tranquility).await;
- Ok(tranquilizer)
- },
- must_exit,
- )
- .await
- }
-
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@@ -397,7 +311,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -575,7 +489,7 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
+ pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@@ -784,72 +698,6 @@ impl BlockManager {
Ok(())
}
-
- // ---- Utility: iteration on files in the data directory ----
-
- async fn for_each_file<F, Fut, State>(
- &self,
- state: State,
- mut f: F,
- must_exit: &watch::Receiver<bool>,
- ) -> Result<(), Error>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send,
- {
- self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
- .await
- .map(|_| ())
- }
-
- fn for_each_file_rec<'a, F, Fut, State>(
- &'a self,
- path: &'a Path,
- mut state: State,
- f: &'a mut F,
- must_exit: &'a watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<State, Error>>
- where
- F: FnMut(State, Hash) -> Fut + Send,
- Fut: Future<Output = Result<State, Error>> + Send,
- State: Send + 'a,
- {
- async move {
- let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
- if *must_exit.borrow() {
- break;
- }
-
- let name = data_dir_ent.file_name();
- let name = if let Ok(n) = name.into_string() {
- n
- } else {
- continue;
- };
- let ent_type = data_dir_ent.file_type().await?;
-
- let name = name.strip_suffix(".zst").unwrap_or(&name);
- if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- state = self
- .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
- .await?;
- } else if name.len() == 64 {
- let hash_bytes = if let Ok(h) = hex::decode(&name) {
- h
- } else {
- continue;
- };
- let mut hash = [0u8; 32];
- hash.copy_from_slice(&hash_bytes[..]);
- state = f(state, hash.into()).await?;
- }
- }
- Ok(state)
- }
- .boxed()
- }
}
#[async_trait]
diff --git a/src/block/repair.rs b/src/block/repair.rs
new file mode 100644
index 00000000..0445527c
--- /dev/null
+++ b/src/block/repair.rs
@@ -0,0 +1,204 @@
+use core::ops::Bound;
+
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::fs;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::tranquilizer::Tranquilizer;
+
+use crate::manager::*;
+
+pub struct RepairWorker {
+ manager: Arc<BlockManager>,
+ next_start: Option<Hash>,
+ block_iter: Option<BlockStoreIterator>,
+}
+
+impl RepairWorker {
+ pub fn new(manager: Arc<BlockManager>) -> Self {
+ Self {
+ manager,
+ next_start: None,
+ block_iter: None,
+ }
+ }
+}
+
+#[async_trait]
+impl Worker for RepairWorker {
+ fn name(&self) -> String {
+ "Block repair worker".into()
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.block_iter.as_mut() {
+ None => {
+ // Phase 1: Repair blocks from RC table.
+
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ let mut batch_of_hashes = vec![];
+ let start_bound = match self.next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .manager
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ // move on to phase 2
+ self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?);
+ return Ok(WorkerStatus::Busy);
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ self.next_start = Some(hash)
+ }
+
+ Ok(WorkerStatus::Busy)
+ }
+ Some(bi) => {
+ // Phase 2: Repair blocks actually on disk
+ // Lists all blocks on disk and adds them to the resync queue.
+ // This allows us to find blocks we are storing but don't actually need,
+ // so that we can offload them if necessary and then delete them locally.
+ if let Some(hash) = bi.next().await? {
+ self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Done)
+ }
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
+ }
+}
+
+// ----
+
+pub struct ScrubWorker {
+ manager: Arc<BlockManager>,
+ iterator: BlockStoreIterator,
+ tranquilizer: Tranquilizer,
+ tranquility: u32,
+}
+
+impl ScrubWorker {
+ pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> {
+ let iterator = BlockStoreIterator::new(&manager).await?;
+ Ok(Self {
+ manager,
+ iterator,
+ tranquilizer: Tranquilizer::new(30),
+ tranquility,
+ })
+ }
+}
+
+#[async_trait]
+impl Worker for ScrubWorker {
+ fn name(&self) -> String {
+ "Block scrub worker".into()
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.tranquilizer.reset();
+ if let Some(hash) = self.iterator.next().await? {
+ let _ = self.manager.read_block(&hash).await;
+ self.tranquilizer.tranquilize(self.tranquility).await;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Done)
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ unreachable!()
+ }
+}
+
+// ----
+
+struct BlockStoreIterator {
+ path: Vec<fs::ReadDir>,
+}
+
+impl BlockStoreIterator {
+ async fn new(manager: &BlockManager) -> Result<Self, Error> {
+ let root_dir = manager.data_dir.clone();
+ let read_root_dir = fs::read_dir(&root_dir).await?;
+ Ok(Self {
+ path: vec![read_root_dir],
+ })
+ }
+
+ async fn next(&mut self) -> Result<Option<Hash>, Error> {
+ loop {
+ if let Some(reader) = self.path.last_mut() {
+ if let Some(data_dir_ent) = reader.next_entry().await? {
+ let name = data_dir_ent.file_name();
+ let name = if let Ok(n) = name.into_string() {
+ n
+ } else {
+ continue;
+ };
+ let ent_type = data_dir_ent.file_type().await?;
+
+ let name = name.strip_suffix(".zst").unwrap_or(&name);
+ if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
+ let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?;
+ self.path.push(read_child_dir);
+ continue;
+ } else if name.len() == 64 {
+ let hash_bytes = if let Ok(h) = hex::decode(&name) {
+ h
+ } else {
+ continue;
+ };
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hash_bytes[..]);
+ return Ok(Some(hash.into()));
+ }
+ } else {
+ self.path.pop();
+ continue;
+ }
+ } else {
+ return Ok(None);
+ }
+ }
+ }
+}
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 640e6975..7678ea26 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
+garage_block = { version = "0.7.0", path = "../block" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c9783e54..8a984cfb 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -693,7 +693,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt)?;
+ launch_online_repair(self.garage.clone(), opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index e6fcd705..a5ccfa02 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -13,7 +13,7 @@ use garage_util::error::Error;
use crate::*;
-pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
@@ -36,24 +36,19 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), E
.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
}
RepairWhat::Blocks => {
- unimplemented!()
- /*
info!("Repairing the stored blocks");
- self.garage
- .block_manager
- .repair_data_store(&must_exit)
- .await?;
- */
+ garage
+ .background
+ .spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
}
RepairWhat::Scrub { tranquility } => {
- unimplemented!()
- /*
info!("Verifying integrity of stored blocks");
- self.garage
- .block_manager
- .scrub_data_store(&must_exit, tranquility)
- .await?;
- */
+ garage.background.spawn_worker(
+ garage_block::repair::ScrubWorker::new(garage.block_manager.clone(), tranquility)
+ .await?,
+ );
}
}
Ok(())
@@ -64,7 +59,7 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), E
struct RepairVersionsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
- iter: usize,
+ counter: usize,
}
impl RepairVersionsWorker {
@@ -72,7 +67,7 @@ impl RepairVersionsWorker {
Self {
garage,
pos: vec![],
- iter: 0,
+ counter: 0,
}
}
}
@@ -93,14 +88,14 @@ impl Worker for RepairVersionsWorker {
v
}
None => {
- info!("repair_versions: finished, done {}", self.iter);
+ info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerStatus::Done);
}
};
- self.iter += 1;
- if self.iter % 1000 == 0 {
- info!("repair_versions: {}", self.iter);
+ self.counter += 1;
+ if self.counter % 1000 == 0 {
+ info!("repair_versions: {}", self.counter);
}
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
@@ -144,7 +139,7 @@ impl Worker for RepairVersionsWorker {
struct RepairBlockrefsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
- iter: usize,
+ counter: usize,
}
impl RepairBlockrefsWorker {
@@ -152,7 +147,7 @@ impl RepairBlockrefsWorker {
Self {
garage,
pos: vec![],
- iter: 0,
+ counter: 0,
}
}
}
@@ -173,14 +168,14 @@ impl Worker for RepairBlockrefsWorker {
v
}
None => {
- info!("repair_block_ref: finished, done {}", self.iter);
+ info!("repair_block_ref: finished, done {}", self.counter);
return Ok(WorkerStatus::Done);
}
};
- self.iter += 1;
- if self.iter % 1000 == 0 {
- info!("repair_block_ref: {}", self.iter);
+ self.counter += 1;
+ if self.counter % 1000 == 0 {
+ info!("repair_block_ref: {}", self.counter);
}
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 92f7990c..e30fecd7 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -159,6 +159,9 @@ impl WorkerHandler {
self.task_id,
e
);
+ // Sleep a bit so that error won't repeat immediately
+ // (TODO good way to handle errors)
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
},
WorkerStatus::Idle => {