diff options
Diffstat (limited to 'src/admin_rpc.rs')
-rw-r--r-- | src/admin_rpc.rs | 112 |
1 files changed, 110 insertions, 2 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 8e278dbe..0318e799 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; +use tokio::sync::watch; use crate::data::*; use crate::error::Error; @@ -10,7 +11,9 @@ use crate::server::Garage; use crate::table::*; use crate::*; +use crate::block_ref_table::*; use crate::bucket_table::*; +use crate::version_table::*; pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_PATH: &str = "_admin"; @@ -152,7 +155,7 @@ impl AdminRpcHandler { } } - async fn handle_launch_repair(&self, repair_all: bool) -> Result<AdminRPC, Error> { + async fn handle_launch_repair(self: &Arc<Self>, repair_all: bool) -> Result<AdminRPC, Error> { if repair_all { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); @@ -175,11 +178,116 @@ impl AdminRpcHandler { ))) } } else { - self.garage.block_manager.launch_repair().await?; + let self2 = self.clone(); + self.garage + .system + .background + .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await }) + .await; Ok(AdminRPC::Ok(format!( "Repair launched on {:?}", self.garage.system.id ))) } } + + async fn repair_worker(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { + self.repair_versions(&must_exit).await?; + self.repair_block_ref(&must_exit).await?; + self.repair_rc(&must_exit).await?; + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + if version.deleted { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket, &version.key) + .await?; + let version_exists = match object { + Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid), + None => { + eprintln!("No object entry found for version {:?}", version); + false + } + }; + if !version_exists { + eprintln!("Marking deleted version: {:?}", version); + self.garage + .version_table + .insert(&Version { + uuid: version.uuid, + deleted: true, + blocks: vec![], + bucket: version.bucket, + key: version.key, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + if block_ref.deleted { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + let ref_exists = match version { + Some(v) => !v.deleted, + None => { + eprintln!("No version found for block ref {:?}", block_ref); + false + } + }; + if !ref_exists { + eprintln!("Marking deleted block_ref: {:?}", block_ref); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_rc(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + // TODO + Ok(()) + } } |